# Keras custom series-custom layer  
一些比較新的技巧，可能內建的Functional API無法提供，這時候就必須自行實現。  
在Keras中，可以使用`Layer class`自行定義運算方式，使用這個方式可以與其他Functional API一起串聯使用而不會有其他問題。  

costom layer有幾個method需要改寫：  
+ \_\_init\_\_(self, ...):初始化參數。
+ build(self, input_shape):從這邊設定初始化相關參數，可以從`input_shape`中得到input shape，自動產生相對應大小的weight。
+ call(self, inputs):Tensor的路徑，中間運算在此完成。
+ (opt)get_config(self):序列化用。
+ (opt)from_config(cls, config):序列化用。

In [1]:
#載入所需lib
import numpy as np
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
print('TensorFlow version:', tf.__version__)

TensorFlow version: 2.2.0


## trainable_weight & non-trainable_weight  
在`__init__()`和`build()`中都可以建立weight。  
weight又分為`trainable`與`non-trainable`，差別在使用optimizer時候weight是否進行調整。  
以下是一個簡單的Dense layer創建方式：  

In [2]:
class D_layer(tf.keras.layers.Layer):
    def __init__(self, units):
        super(D_layer, self).__init__()
        self.units = units
    def build(self, input_shape):
        #創建matrix weights
        self.w = self.add_weight(
            #matrix weights shape
            shape=(input_shape[-1], self.units),
            #weight初始化方式
            initializer='random_normal',
            #設定是能夠修改
            trainable=True
        )
        
        self.b = self.add_weight(
            shape=(self.units,),
            initializer='random_normal',
            trainable=True
        )
    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b
    def get_config(self):
        config = super(D_layer, self).get_config()
        config.update({'units': self.units})
        return config
    def from_config(cls, config):
        return cls(**config)

測試一下，假定輸入一個32-dim data有64-dim output，總trainable_weight應該是：  
32*64(matrix) + 64(bias) = 2112。

In [3]:
inputs = tf.keras.Input(shape=(32,))
x = D_layer(64)(inputs)
model = tf.keras.Model(inputs=inputs, outputs=x)
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 32)]              0         
_________________________________________________________________
d_layer (D_layer)            (None, 64)                2112      
Total params: 2,112
Trainable params: 2,112
Non-trainable params: 0
_________________________________________________________________


## add_loss()  
通常希望model不要`overfitting`的話會使用`Regularization`方法限制weight大小，在custom layer中可以透過`add_loss()`將`regularization loss`加入。  
若使用`fit()`訓練，則`regularization loss`會自行添加，若是自訂義可以呼叫`model.losses`或者`layer.losses`加入總loss後再進行調整weight。

In [4]:
class D_reg_layer(tf.keras.layers.Layer):
    def __init__(self, units, rate=1e-3):
        super(D_reg_layer, self).__init__()
        self.units = units
        self.rate = rate
    def build(self, input_shape):
        #創建matrix weights
        self.w = self.add_weight(
            #matrix weights shape
            shape=(input_shape[-1], self.units),
            #weight初始化方式
            initializer='random_normal',
            #設定是能夠修改
            trainable=True
        )
        
        self.b = self.add_weight(
            shape=(self.units,),
            initializer='random_normal',
            trainable=True
        )
    def call(self, inputs):
        #add L2-reularization loss
        self.add_loss(self.rate * tf.math.reduce_sum(tf.math.square(self.w)))
        return tf.matmul(inputs, self.w) + self.b
    def get_config(self):
        config = super(D_reg_layer, self).get_config()
        config.update({'units': self.units, 'rate': self.rate})
        return config
    def from_config(cls, config):
        return cls(**config)

In [5]:
tf.keras.backend.clear_session()
inputs = tf.keras.Input(shape=(32,))
x = D_reg_layer(64)(inputs)
model = tf.keras.Model(inputs=inputs, outputs=x)
model.summary()
_ = model(tf.zeros((1, 32)))

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 32)]              0         
_________________________________________________________________
d_reg_layer (D_reg_layer)    (None, 64)                2112      
Total params: 2,112
Trainable params: 2,112
Non-trainable params: 0
_________________________________________________________________


查看regularization loss：

In [6]:
sum(model.losses).numpy()

0.0051943753

layer的losses：

In [7]:
test_layer = D_reg_layer(64)
test_layer(tf.zeros((1, 32)))
sum(test_layer.losses).numpy()

0.005062524

## training 與 inference不同調的layer  
`BatchNormalization`與`Dropout系列`，training與inference是有所不同的，Dropout在training時候，會隨機丟棄node，但是在inference時候，會將所有output保留並且乘上一個權重(在實際運作上，在training時會將輸出除以保留機率，在inference時就不除保留)。  
若custom layer也有相似的運行模式，則在`call()` method中加入`training=None`，透過這個參數就可以控制是在training phase還是inference phase。

### Example - Dropout

In [8]:
tf.keras.backend.clear_session()
inputs = tf.keras.Input(shape=(5,))
x = tf.keras.layers.Dropout(0.3)(inputs)
model = tf.keras.Model(inputs=inputs, outputs=x)
print(model(tf.ones((1, 5)), training=True))
print(model(tf.ones((1, 5))))
print(model.predict(tf.ones((1, 5))))

tf.Tensor([[1.4285715 0.        1.4285715 1.4285715 1.4285715]], shape=(1, 5), dtype=float32)
tf.Tensor([[1. 1. 1. 1. 1.]], shape=(1, 5), dtype=float32)
[[1. 1. 1. 1. 1.]]


由這可知道，當使用`model(x)`與`model.predict(x)`，皆為inference phase，若要觀察training phase則須帶入`training=True`參數，這個在自定義training中若有這種不同步的layer記得加入。  
接著，用DropBlock來演示自定義layer如何實現不同步的layer。

### DropBlock - 用於圖像的Dropout  
[DropBlock](https://arxiv.org/abs/1810.12890)：根據論文內容，比傳統Dropout更能提升圖像分類的準確度。  
想法也很簡單，圖像是有地域性的，所以一次丟掉一塊而不是一個像素更能找出更好的特徵。  
以下實現採用與TensorFlow Dropout相同思路。

In [9]:
class DropBlock(tf.keras.layers.Layer):
    # drop機率、block size
    def __init__(self, drop_rate=0.3, block_size=3, **kwargs):
        super(DropBlock, self).__init__(**kwargs)
        self.rate = drop_rate
        self.block_size = block_size
    
    # 加入training parameter，用以判斷training phase or inference phase
    def call(self, inputs, training=None):
        # training phase
        if training:
            b = tf.shape(inputs)[0]
            random_tensor = tf.random.uniform(shape=[b, self.m_h, self.m_w, self.c]) + self.bernoulli_rate
            binary_tensor = tf.floor(random_tensor)
            binary_tensor = tf.pad(
                binary_tensor,
                [[0, 0], [self.block_size // 2, self.block_size // 2], [self.block_size // 2, self.block_size // 2], [0, 0]]
            )
            binary_tensor = tf.nn.max_pool(
                binary_tensor,
                [1, self.block_size, self.block_size, 1],
                [1,1,1,1],
                'SAME'
            )
            binary_tensor = 1 - binary_tensor
            inputs = tf.math.divide(inputs, (1 - self.rate)) * binary_tensor
        return inputs
    
    def build(self, input_shape):
        self.b, self.h, self.w, self.c = input_shape.as_list()
        self.m_h = self.h - (self.block_size // 2) * 2
        self.m_w = self.w - (self.block_size // 2) * 2
        self.bernoulli_rate = (self.rate * self.h * self.w) / (self.m_h * self.m_w * self.block_size**2)
    
    def get_config(self):
        config = super(DropBlock, self).get_config()
        config.update({'rate': self.rate, 'block_size': self.block_size})
        return config
    
    def from_config(cls, config):
        return cls(**config)

In [10]:
tf.keras.backend.clear_session()
inputs = tf.keras.Input(shape=(5, 5, 3))
x = DropBlock()(inputs)
model = tf.keras.Model(inputs=inputs, outputs=x)
result = model(tf.ones((1,5,5,3)), training=True)
print('model(x, training=True):')
for i in range(3):
    print(result[:,:,:,1].numpy())
print('-'*50)
result = model(tf.ones((1,5,5,3)))
print('model(x):')
for i in range(3):
    print(result[:,:,:,1].numpy())
print('-'*50)
print('model.predict(x):')
result = model.predict(tf.ones((1,5,5,3)))
for i in range(3):
    print(result[:,:,:,1])

model(x, training=True):
[[[0.        0.        0.        1.4285715 1.4285715]
  [0.        0.        0.        1.4285715 1.4285715]
  [0.        0.        0.        1.4285715 1.4285715]
  [1.4285715 1.4285715 1.4285715 1.4285715 1.4285715]
  [1.4285715 1.4285715 1.4285715 1.4285715 1.4285715]]]
[[[0.        0.        0.        1.4285715 1.4285715]
  [0.        0.        0.        1.4285715 1.4285715]
  [0.        0.        0.        1.4285715 1.4285715]
  [1.4285715 1.4285715 1.4285715 1.4285715 1.4285715]
  [1.4285715 1.4285715 1.4285715 1.4285715 1.4285715]]]
[[[0.        0.        0.        1.4285715 1.4285715]
  [0.        0.        0.        1.4285715 1.4285715]
  [0.        0.        0.        1.4285715 1.4285715]
  [1.4285715 1.4285715 1.4285715 1.4285715 1.4285715]
  [1.4285715 1.4285715 1.4285715 1.4285715 1.4285715]]]
--------------------------------------------------
model(x):
[[[1. 1. 1. 1. 1.]
  [1. 1. 1. 1. 1.]
  [1. 1. 1. 1. 1.]
  [1. 1. 1. 1. 1.]
  [1. 1. 1. 1. 1.]]]
[

## Custom layer 可以包含其他layer  
自定義layer也可以包含其他layer做運算，包含custom layer、functional API。  
使用custom layer將之前的例子包裝。

In [11]:
class MNIST(tf.keras.layers.Layer):
    def __init__(self,name=None, **kwargs):
        super(MNIST, self).__init__(name=name, **kwargs)
        self.conv_1 = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation='relu')
        self.max_pool_1 = tf.keras.layers.MaxPooling2D()
        self.conv_2 = tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation='relu')
        self.max_pool_2 = tf.keras.layers.MaxPooling2D()
        self.flatten = tf.keras.layers.Flatten()
        self.drop = tf.keras.layers.Dropout(0.5)
        self.out = tf.keras.layers.Dense(10, activation='softmax')
    def call(self, inputs, training=None):
        x = self.conv_1(inputs)
        x = self.max_pool_1(x)
        x = self.conv_2(x)
        x = self.max_pool_2(x)
        x = self.flatten(x)
        x = self.drop(x)
        x = self.out(x)
        return x
        

In [12]:
tf.keras.backend.clear_session()
inputs = tf.keras.Input(shape=(28, 28, 1))
x = MNIST()(inputs)
model = tf.keras.Model(inputs=inputs, outputs=x)
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
mnist (MNIST)                (None, 10)                34826     
Total params: 34,826
Trainable params: 34,826
Non-trainable params: 0
_________________________________________________________________


In [13]:
#download MNIST dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.astype('float32') / 255
y_train = y_train.astype('float32')
x_test = x_test.astype('float32') / 255
y_test = y_test.astype('float32')
x_train = np.expand_dims(x_train, -1)
y_train = np.expand_dims(y_train, -1)
x_test = np.expand_dims(x_test, -1)
y_test = np.expand_dims(y_test, -1)

In [14]:
model.compile(
    loss='sparse_categorical_crossentropy',
    optimizer='adam',
    metrics=['sparse_categorical_accuracy']
)
history = model.fit(x_train, y_train, batch_size=128, epochs=2, validation_split=0.1)

Epoch 1/2
Epoch 2/2


## **總結**  
Custom layer可以很方便的自訂義tensor運算，對於實現一些新的論文方法有很高的靈活度，並且照著規範處理，其餘的部分就不須花時間處理。  
主要三個method：  
+ `__init__()`:初始化的參數。  
+ `build()`:根據前一個layer的output shape進行本層的動態參數設定。  
+ `call()`: tensor向前運算的區域，對於training與inference不同模式的layer，記得增加`training=None`參數提供判斷。