# 3. Guide - 1. Keras - 4. Writing layers and models with TensorFlow Keras

목차

1. Setup
2. The Layer class
 1. A Layers.encapsulate a state (weight) and some computation
 2. Best practice: deferring weight creation until the shape of the inputs is known
 3. Layers are recursively composable
 4. Layers recursively collect losses created during the forward pass
 5. You can optionally enable serialization on your layers
 6. Privileged Training Argument in the call method
3. Building Models
 1. The Model class
 2. Putting it all together: an end-to-end example
 3. Beyond object-oriented development: the Functional API

## 1. Setup

In [1]:
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
tf.keras.backend.clear_session()

In [2]:
print(tf.__version__)

2.0.0-alpha0


## 2. The Layer class
### 1. A Layers.encapsulate a state (weight) and some computation

In [3]:
from tensorflow.keras import layers

class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        w_init = tf.random_normal_initializer() # mearn=0, stddev=0.05, only float types are supported
        self.w = tf.Variable(initial_value = w_init(shape=(input_dim, units),
                                                   dtype='float32'),
                            trainable=True)
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(initial_value = b_init(shape=(units,),
                                                   dtype='float32'),
                            trainable=True)
        
    def call(self, inputs):
        print(self.w)
        print(self.b)
        return tf.matmul(inputs, self.w) + self.b
    
x = tf.ones((2,2))
linear_layer = Linear(4,2)
y = linear_layer(x)
print(y)

<tf.Variable 'Variable:0' shape=(2, 4) dtype=float32, numpy=
array([[-0.02438336,  0.0277977 , -0.06955899,  0.0714218 ],
       [ 0.05887324, -0.01789667, -0.05431752, -0.10398325]],
      dtype=float32)>
<tf.Variable 'Variable:0' shape=(4,) dtype=float32, numpy=array([0., 0., 0., 0.], dtype=float32)>
tf.Tensor(
[[ 0.03448988  0.00990104 -0.12387651 -0.03256144]
 [ 0.03448988  0.00990104 -0.12387651 -0.03256144]], shape=(2, 4), dtype=float32)


In [4]:
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

In [5]:
linear_layer.weights

[<tf.Variable 'Variable:0' shape=(2, 4) dtype=float32, numpy=
 array([[-0.02438336,  0.0277977 , -0.06955899,  0.0714218 ],
        [ 0.05887324, -0.01789667, -0.05431752, -0.10398325]],
       dtype=float32)>,
 <tf.Variable 'Variable:0' shape=(4,) dtype=float32, numpy=array([0., 0., 0., 0.], dtype=float32)>]

In [6]:
from tensorflow.keras import layers

class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(shape=(input_dim, units),
                                 initializer = 'random_normal',
                                 trainable=True)
        self.b = self.add_weight(shape=(units,),
                                 initializer='zeros',
                                 trainable=True)
        
    def call(self, inputs):
        print(self.w)
        print(self.b)
        return tf.matmul(inputs, self.w) + self.b
    
x = tf.ones((2,2))
linear_layer = Linear(4,2)
y = linear_layer(x)
print(y)

<tf.Variable 'Variable:0' shape=(2, 4) dtype=float32, numpy=
array([[-0.10860167, -0.04979282, -0.06922183,  0.04520001],
       [-0.06313514, -0.06170268,  0.05446358, -0.00530851]],
      dtype=float32)>
<tf.Variable 'Variable:0' shape=(4,) dtype=float32, numpy=array([0., 0., 0., 0.], dtype=float32)>
tf.Tensor(
[[-0.1717368  -0.11149549 -0.01475825  0.0398915 ]
 [-0.1717368  -0.11149549 -0.01475825  0.0398915 ]], shape=(2, 4), dtype=float32)


#### Layers can have non-trainable weights

Besides trainable weights, you can add non-trainable weights to a layer as well. Such weights are meant not to be taken into account during backpropagation, when you are training the layer.

In [7]:
class ComputeSum(layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim)),
                                trainable=False)
        
    def call(self, inputs):
        print(self.total)
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        print(self.total)
        return self.total
    
x = tf.ones((2,2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())

<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([0., 0.], dtype=float32)>
<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([2., 2.], dtype=float32)>
[2. 2.]
<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([2., 2.], dtype=float32)>
<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([4., 4.], dtype=float32)>
[4. 4.]


In [8]:
tf.reduce_sum(x, axis=0)

<tf.Tensor: id=115, shape=(2,), dtype=float32, numpy=array([2., 2.], dtype=float32)>

In [9]:
print('weights:', len(my_sum.weights))
print('non-trainable weights:', len(my_sum.non_trainable_weights))

print('trainable weights:', len(my_sum.trainable_weights))

weights: 1
non-trainable weights: 1
trainable weights: 0


In [10]:
print('weights:', my_sum.weights)
print('non-trainable weights:', my_sum.non_trainable_weights)

print('trainable weights:', len(my_sum.trainable_weights))

weights: [<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([4., 4.], dtype=float32)>]
non-trainable weights: [<tf.Variable 'Variable:0' shape=(2,) dtype=float32, numpy=array([4., 4.], dtype=float32)>]
trainable weights: 0


### 2. Best practice: deferring weight creation until the shape of the inputs is known

In [11]:
class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(shape=(input_dim, units),
                                initializer='random_normal',
                                trainable=True)
        self.b = self.add_weight(shape=(units,),
                                 initializer='zeros',
                                 trainable=True)

In [12]:
class Linear(layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                initializer='random_normal',
                                trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                 initializer='zeros',
                                 trainable=True)
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

In [13]:
x = tf.ones((2,2))
linear_layer = Linear(32)
y = linear_layer(x)
print(y)

tf.Tensor(
[[-0.00549616 -0.00095546 -0.13881332  0.11873589 -0.10616826  0.00998187
  -0.04270286  0.06722576  0.05021397 -0.07095472  0.04934657 -0.05055718
  -0.02400014 -0.07064343 -0.02878763  0.03326192 -0.11734097  0.11738516
   0.10818116  0.11819483  0.00040014  0.03704372  0.17260377 -0.04371251
   0.11862691 -0.08412854  0.10576034 -0.00344074 -0.06683973  0.00077652
  -0.04206975 -0.00267351]
 [-0.00549616 -0.00095546 -0.13881332  0.11873589 -0.10616826  0.00998187
  -0.04270286  0.06722576  0.05021397 -0.07095472  0.04934657 -0.05055718
  -0.02400014 -0.07064343 -0.02878763  0.03326192 -0.11734097  0.11738516
   0.10818116  0.11819483  0.00040014  0.03704372  0.17260377 -0.04371251
   0.11862691 -0.08412854  0.10576034 -0.00344074 -0.06683973  0.00077652
  -0.04206975 -0.00267351]], shape=(2, 32), dtype=float32)


### 3. Layers are recursively composable

In [14]:
class MLPBlock(layers.Layer):
    def __init__(self):
        super(MLPBlock, self).__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)
    def call(self,inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)
    
mlp = MLPBlock()
y = mlp(tf.ones(shape=(3,64)))
print('weights:', len(mlp.weights))
print('trainable_weights:', len(mlp.trainable_weights))
#print('weights:', (mlp.weights))
#print('trainable_weights:', (mlp.trainable_weights))
print(y)

weights: 6
trainable_weights: 6
tf.Tensor(
[[0.01623996]
 [0.01623996]
 [0.01623996]], shape=(3, 1), dtype=float32)


### 4. Layers recursively collect losses created during the forward pass

In [15]:
class ActivityRegularizationLayer(layers.Layer):
    def __init__(self, rate=1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate
    def call(self,inputs):
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs

In [16]:
class OuterLayer(layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)
        
    def call(self, inputs):
        return self.activity_reg(inputs)
    
layer = OuterLayer()
assert len(layer.losses) == 0
_ = layer(tf.zeros(1,1))
assert len(layer.losses) == 1

_ = layer(tf.zeros(1,1))
assert len(layer.losses) == 1

In [17]:
layer.losses

[<tf.Tensor: id=246, shape=(), dtype=float32, numpy=0.0>]

In [18]:
class OuterLayer(layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))
        
    def call(self, inputs):
        return self.dense(inputs)
    
layer = OuterLayer()
_ = layer(tf.zeros((1,1)))

print(layer.losses)

[<tf.Tensor: id=284, shape=(), dtype=float32, numpy=0.0019819872>]


```python
optimizer = tf.keras.optimizers.SGD(1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

for x_batch_train, y_batch_train in train_dataset:
    with tf.GradientTape() as tape:
        logits = layer(x_batch_train)
        loss_value = loss_fn(y_batch_train, logits)
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
```

### 5. You can optionally enable serialization on your layers

In [19]:
class Linear(layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                initializer='random_normal',
                                trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                initializer='random_normal',
                                trainable=True)
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    
    def get_config(self):
        return {'units' : self.units}
    
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'units': 64}


In [20]:
class Linear(layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                                initializer='random_normal',
                                trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                                initializer='random_normal',
                                trainable=True)
        
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    
    def get_config(self):
        config = super(Linear, self).get_config()
        config.update({'units' : self.units})
        return config
    
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)

{'name': 'linear_8', 'trainable': True, 'dtype': None, 'units': 64}


In [21]:
def from_config(cls, config):
    return cls(**config)

### 6. Privileged Training Argument in the call method

In [22]:
class CustomDropout(layers.Layer):
    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate
        
    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

## 3. Building Models
### 1. The Model class

```python
class ResNet(tf.keras.Model):
    def __init__(self):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)
        
    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)
    
resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save_weights(filepath)        
```

### 2. Putting it all together: an end-to-end example

In [23]:
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch,dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
class Encoder(layers.Layer):
    def __init__(self,
                latent_dim=32,
                intermediate_dim=64,
                name='encoder',
                **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()
        
    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z
    
class Decoder(layers.Layer):
    def __init__(self,
                original_dim,
                intermediate_dim=64,
                name='decoder',
                **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_output = layers.Dense(original_dim, activation='sigmoid')
    
    def call(self,inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)
    
class VariationalAutoEncoder(tf.keras.Model):
    def __init__(self, 
                 original_dim,
                 intermediate_dim=64,
                 latent_dim=32,
                 name='autoencoder',
                 **kwargs):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim = latent_dim,
                              intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        kl_loss = -0.5 *tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
        self.add_loss(kl_loss)
        return reconstructed
    
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size = 1024).batch(64)

for epoch in range(3):
    print('Start of epoch %d' % (epoch,))
    
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)
            
        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))
        
        loss_metric(loss)
        
        if step % 100 == 0:
            print('step %s: mean loss = %s' %(step, loss_metric.result()))


Start of epoch 0
step 0: mean loss = tf.Tensor(0.33883977, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.124884464, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.09889148, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.08896656, shape=(), dtype=float32)
step 400: mean loss = tf.Tensor(0.084094346, shape=(), dtype=float32)
step 500: mean loss = tf.Tensor(0.080759406, shape=(), dtype=float32)
step 600: mean loss = tf.Tensor(0.0786373, shape=(), dtype=float32)
step 700: mean loss = tf.Tensor(0.07708535, shape=(), dtype=float32)
step 800: mean loss = tf.Tensor(0.07590893, shape=(), dtype=float32)
step 900: mean loss = tf.Tensor(0.0749209, shape=(), dtype=float32)
Start of epoch 1
step 0: mean loss = tf.Tensor(0.07460664, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.07395489, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.073460914, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.072985545, shape=(), dtype=float32)
s

In [24]:
vae = VariationalAutoEncoder(784, 64,32)
optimizer = tf.keras.optimizers.Adam(1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x2615fdaaa58>

### 3. Beyond object-oriented development: the Functional API

In [25]:
original_dim = 784
intermediate_dim = 64
latent_dim = 32

original_inputs = tf.keras.Input(shape=(original_dim,), name='encoder_input')
x = layers.Dense(intermediate_dim, activation='relu')(original_inputs)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name='encoder')

latent_inputs = tf.keras.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = layers.Dense(original_dim, activation='sigmoid')(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name='decoder')

outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name='vae')

kl_loss = - 0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tensorflow.python.keras.callbacks.History at 0x2627ab5b6a0>