# CS492 전산학특강<인공지능 산업 및 스마트에너지>
## Deep Learning Practice 
#### Prof. Ho-Jin Choi
#### School of Computing, KAIST

---

### 7-5. Custom layers 

In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
    pass
import tensorflow as tf

tf.keras.backend.clear_session()  # For easy reset of notebook state.

#### The Layer class
The main data structure you'll work with is the Layer. A layer encapsulates both a **state (the layer's _"weights"_)** and a transformation from inputs to outputs **(a "call", the layer's _forward pass_)**.

Here's a densely-connected layer. It has a state: the variables `w` and `b`.

In [None]:
from tensorflow.keras import layers


class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        w_init = 
        self.w = 
        
        b_init =
        self.b =

    def call(self, inputs):
        return 

    
x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)

Note that the weights `w` and `b` are automatically tracked by the layer upon being set as layer attributes:

In [None]:
print(linear_layer.weights)

In [None]:
print([linear_layer.w])

In [None]:
print([linear_layer.b])

In [None]:
assert linear_layer.weights == [linear_layer.w, linear_layer.b]

Note you also have access to a quicker shortcut for **adding weight to a layer**: the `add_weight` method:

In [None]:
class Linear(layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        # Using add_wegiht
        self.w = 
        
        self.b = 
        

    def call(self, inputs):
        return 

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)

#### Layers can have non-trainable weights
Besides trainable weights, you can add _non-trainable weights_ to a layer as well. Such weights are meant not to be taken into account during backpropagation, when you are training the layer.

Here's how to add and use a non-trainable weight:

In [None]:
class ComputeSum(layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim,)),
                                 trainable=False)
        
    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total

x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())

It's part of `layer.weights`, but it gets categorized as a non-trainable weight:

In [None]:
print('weights:', len(my_sum.weights))
print('non-trainable weights:', len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print('trainable_weights:', my_sum.trainable_weights)

**Best practice: deferring weight creation until the shape of the inputs is known** <br>
In the logistic regression example above, our `Linear` layer took an `input_dim` argument that was used to compute the shape of the weights `w` and `b` in `__init__`:
```python
class Linear(layers.Layer):
  def __init__(self, units=32, input_dim=32):
      super(Linear, self).__init__()
      self.w = self.add_weight(shape=(input_dim, units),
                               initializer='random_normal',
                               trainable=True)
      self.b = self.add_weight(shape=(units,),
                               initializer='zeros',
                               trainable=True)
```

In many cases, **you may not know in advance the size of your inputs**, and you **would like to lazily create weights when that value becomes known**, some time after instantiating the layer.

In the Keras API, we recommend creating layer weights in the `build(inputs_shape)` method of your layer. Like this:

In [None]:
class Linear(layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = 

    def build(self, input_shape):
        # To multiply inputs * weight. i.e., input_shape = (x, y) / weight (?, u) -> ? have to be `y` dim
        self.w = 
        
        self.b = 
        

    def call(self, inputs):
        return

The **`__call__` method of your layer will automatically run build the first time it is called**. You now have a layer that's lazy and easy to use:

In [None]:
linear_layer = Linear(32)  # At instantiation, we don't know on what inputs this is going to get called
y = linear_layer(x)  # The layer's weights are created dynamically the first time the layer is called

#### Layers are recursively composable
If you assign a Layer instance as attribute of another Layer, the outer layer will start tracking the weights of the inner layer.

We recommend creating such sublayers in the `__init__` method (since the sublayers will typically have a build method, they will be built when the outer layer gets built).

In [None]:
# Let's assume we are reusing the Linear class
# with a `build` method that we defined above.

class MLPBlock(layers.Layer):
    def __init__(self):
        super(MLPBlock, self).__init__()


    def call(self, inputs):



mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print('weights:', len(mlp.weights))
print('trainable weights:', len(mlp.trainable_weights))

**Layers recursively collect losses created during the forward pass** <br>
When writing the `call` method of a layer, you can create _loss tensors_ that you will want to use later, when writing your training loop. This is doable by calling `self.add_loss(value)`:

In [None]:
# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(layers.Layer):
    def __init__(self, rate=1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate

    def call(self, inputs):
        # You can define a loss for your layer
        # after computing the loss you defined, save the loss value in `layer.losses`
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs

These losses (including those created by any inner layer) can be retrieved via `layer.losses`. This property is reset at the start of every `__call__ `to the top-level layer, so that `layer.losses` always contains the loss values created during the last forward pass.

In [None]:
class OuterLayer(layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)

layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # This is the loss created during the call above

These losses are meant to be taken into account when writing training loops, like this:

```python
# Instantiate an optimizer.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
    with tf.GradientTape() as tape:
        logits = layer(x_batch_train)  # Logits for this minibatch
        # Loss value for this minibatch
        loss_value = loss_fn(y_batch_train, logits)
        # Add extra losses created during this forward pass:
        loss_value += sum(model.losses)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
```

#### (Optional) Privileged training argument in the call method
Some layers, in particular the `BatchNormalization` layer and the `Dropout` layer, have different behaviors during training and inference. For such layers, **it is standard practice to expose a training (boolean) argument in the call method.**

By exposing this argument in call, you enable the built-in training and evaluation loops (e.g. fit) to correctly use the layer in training and inference.

In [None]:
class CustomDropout(layers.Layer):
    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

#### Building Models 
In general, you will **use the `Layer` class to define inner computation blocks, and will use the `Model` class to define the outer model** -- the object you will train.

For instance, in a ResNet50 model, you would have several ResNet blocks subclassing `Layer`, and a single `Model` encompassing the entire ResNet50 network.

Effectively, the "Layer" class corresponds to what we refer to in the literature as a "layer" (as in "convolution layer" or "recurrent layer") or as a "block" (as in "ResNet block" or "Inception block").

Meanwhile, the "Model" class corresponds to what is referred to in the literature as a "model" (as in "deep learning model") or as a "network" (as in "deep neural network").

For instance, we could take our mini-resnet example above, and use it to build a Model that we could train with `fit()`, and that we could save with `save_weights`:

```python
x = layers.Conv2D(32, 3, activation='relu')(inputs)
x = layers.Conv2D(64, 3, activation='relu')(x)
block_1_output = layers.MaxPooling2D(3)(x)

x = layers.Conv2D(64, 3, activation='relu', padding='same')(block_1_output)
x = layers.Conv2D(64, 3, activation='relu', padding='same')(x)
block_2_output = layers.add([x, block_1_output])

x = layers.Conv2D(64, 3, activation='relu', padding='same')(block_2_output)
x = layers.Conv2D(64, 3, activation='relu', padding='same')(x)
block_3_output = layers.add([x, block_2_output])
```
--> Create a `Layer` class based on subclassing <br>
--> Then, we can make some instances for `ResnetBlock()`

```python
class ResNet(tf.keras.Model):
    def __init__(self):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save_weights(filepath)
```

#### Putting it all together: an end-to-end example
Here's what you've learned so far: 
- A `Layer` encapsulate a state (created in `__init__` or `build`) and some computation (in `call`). 
- Layers can be recursively nested to create new, bigger computation blocks. 
- Layers can create and track losses (typically regularization losses).
- The outer container, the thing you want to train, is a `Model`. A `Model` is just like a `Layer`, but with added training and serialization utilities.

Let's put all of these things together into an end-to-end example: we're going to implement a Variational AutoEncoder (VAE). We'll train it on MNIST digits.

Our VAE will be a subclass of `Model`, built as a nested composition of layers that subclass Layer. It will feature a regularization loss (KL divergence).

<img src=https://lilianweng.github.io/lil-log/assets/images/vae-gaussian.png>

In [None]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [None]:
class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self,
               latent_dim=32,
               intermediate_dim=64,
               name='encoder',
               **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z

In [None]:
class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self,
               original_dim,
               intermediate_dim=64,
               name='decoder',
               **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
        self.dense_output = layers.Dense(original_dim, activation='sigmoid')

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)

In [None]:
class VariationalAutoEncoder(tf.keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(self,
               original_dim,
               intermediate_dim=64,
               latent_dim=32,
               name='autoencoder',
               **kwargs):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim,
                               intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = - 0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
        self.add_loss(kl_loss)
        return reconstructed

In [None]:
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

In [None]:
# Iterate over epochs.
for epoch in range(3):
    print('Start of epoch %d' % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print('step %s: mean loss = %s' % (step, loss_metric.result()))

Note that since the VAE is subclassing `Model`, it features **built-in training loops** (i.e., `fit()`). So you could also have trained it like this:

In [None]:
vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)