# Exercise6: TensorFlow進階技巧

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/taipeitechmmslab/MMSLAB-TF2/blob/master/Exercise/Exercise6.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/taipeitechmmslab/MMSLAB-TF2/blob/master/Exercise/Exercise6.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
</table>

In [1]:
import tensorflow as tf

## Custom Layers
透過繼承`tf.keras.layers.Layer`類別，來輕鬆創建字定義的網路層。

你可以到https://www.tensorflow.org/api_docs/python/tf/keras/layers 官方API察看更多的網路層。

Example: 簡單的客自化Convolutional layers。

In [2]:
class CustomConv2D(tf.keras.layers.Layer):
    def __init__(self, filters, kernel_size, strides=(1, 1), padding="VALID", **kwargs):
        super(CustomConv2D, self).__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = (1, *strides, 1)
        self.padding = padding

    def build(self, input_shape):
        kernel_h, kernel_w = self.kernel_size
        input_dim = input_shape[-1]
        # 創建卷積的權重值(weights)
        self.w = self.add_weight(name='kernel', 
                                 shape=(kernel_h, kernel_w, input_dim, self.filters),
                                 initializer='glorot_uniform',  # 設定初始化方法
                                 trainable=True)  # 設定這個權重是否能夠訓練(更動)
        # 創建卷積的偏差值(bias)
        self.b = self.add_weight(name='bias', 
                                 shape=(self.filters,),
                                 initializer='zeros',  # 設定初始化方法
                                 trainable=True)  # 設定這個權重是否能夠訓練(更動)

    def call(self, inputs):
        x = tf.nn.conv2d(inputs, self.w, self.strides, padding=self.padding) # 卷積運算
        x = tf.nn.bias_add(x, self.b)  # 加上偏差值
        x = tf.nn.relu(x)  # 激活函數
        return x

## Custom Dense Layers
透過繼承`tf.keras.layers.Layer`類別，來輕鬆創建字定義的網路層。

你可以到https://www.tensorflow.org/api_docs/python/tf/keras/layers 官方API察看更多的網路層。

In [3]:
class CustomDense(tf.keras.layers.Layer):
    def __init__(self, unit, activation=None, use_bias=True, **kwargs):
        super(CustomDense, self).__init__(**kwargs)
        self.unit = unit
        self.activation = tf.keras.activations.get(activation)
        self.use_bias = use_bias
    
    def build(self, input_shape):
        input_dim = input_shape[-1]
        #========================= Write your code here ========================= #


        if self.use_bias:

        #======================================================================== #

    def call(self, inputs):
        outputs = tf.matmul(inputs, self.w) + self.b
        if self.activation is not None:
            return self.activation(outputs)
        return outputs

## Custom Loss

在設計進階的研究方法或更深入解決問題時，TensorFlow官方文件所提供的損失函數，通常不夠使用，這時候你就必須自己定義損失函數，實作自己設計的損失函數。

Example: 簡單的客自化Crossentropy Loss。

In [4]:
def custom_categorical_crossentropy(y_true, y_pred):
    # x = tf.reduce_mean(-tf.reduce_sum(y_true * tf.log(y_pred), reduction_indices=[1]))
    x = tf.nn.softmax_cross_entropy_with_logits(labels=y_true, logits=y_pred)
    return x

## Custom Metrics

如果你需要的指標函數官方API並沒有提供，你可以通過繼承`tf.keras.metrics.Metric`類別，來輕鬆創建自定義的指標函數。

Example: 計算多少個樣本是正確分類的。

In [5]:
class CustomCategoricalAccuracy(tf.keras.metrics.Metric):
    def __init__(self, name='custom_catrgorical_accuracy', **kwargs):
        super(CustomCategoricalAccuracy, self).__init__(name=name, **kwargs)
        # 記錄正確預測的數量
        self.correct = self.add_weight('correct_numbers', initializer='zeros')
        # 記錄全部資料的量數
        self.total = self.add_weight('total_numbers', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        # 輸入答案為One-hot編碼，所以取最大的數值為答案
        y_true = tf.argmax(y_true, axis=-1)
        # 取預測輸出最大的數值為預測結果
        y_pred = tf.argmax(y_pred, axis=-1)
        # 比較預測結果是否正確，正確會返回True(正確)，錯誤會返回False(錯誤)
        values = tf.equal(y_true, y_pred)
        # 轉為浮點數True(正確)=1.0，False(錯誤)=0.0
        values = tf.cast(values, tf.float32)
        # 將values所有數值相加就會等於正確預測的總數
        values_sum = tf.reduce_sum(values)
        # 計算這個Batch的資料數量
        num_values = tf.cast(tf.size(values), tf.float32)
        self.correct.assign_add(values_sum)  # 更新正確預測的總數
        self.total.assign_add(num_values)  # 更新資料量的總數

    def result(self):
        # 計算準確率
        return tf.math.divide_no_nan(self.correct, self.total)

    def reset_states(self):
        # 每一次Epoch結束後會重新初始化變數
        self.correct.assign(0.)
        self.total.assign(0.)

## Custom Callbacks

如果你需要的監控或執行的操作官方的Callbacks函數沒有提供，你可以通過繼承`tf.keras.callbacks.Callback`類別，來輕鬆創建字定義的Callbacks函數。

Example: 紀錄每一個batch的loss值。

In [6]:
class SaveModel(tf.keras.callbacks.Callback):
    def __init__(self, weights_file, monitor='loss', mode='min', save_weights_only=False):
        super(SaveModel, self).__init__()
        self.weights_file = weights_file
        self.monitor = monitor
        self.mode = mode
        self.save_weights_only = save_weights_only
        if mode == 'min':
            self.best = np.Inf
        else:
            self.best = -np.Inf
        
    def save_model(self):
        if self.save_weights_only:
            self.model.save_weights(self.weights_file)
        else:
            self.model.save(self.weights_file)

    def on_epoch_end(self, epoch, logs=None):
        monitor_value = logs.get(self.monitor)
        if self.mode == 'min' and monitor_value < self.best:
            self.save_model()
            self.best = monitor_value
        elif self.mode == 'max' and monitor_value > self.best:
            self.save_model()
            self.best = monitor_value

# 實驗：比較Keras高階API和客製化API兩種網路訓練的結果

### Import 必要套件

In [7]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import initializers
from preprocessing import parse_aug_fn, parse_fn

### Cifar 10
載入Cifar10數據集

In [8]:
# 將train Data重新分成9:1等分，分別分給train data, valid data
train_split, valid_split = ['train[:90%]', 'train[90%:]']
# 取得訓練數據，並順便讀取data的資訊
train_data, info = tfds.load("cifar10", split=train_split, with_info=True)
# 取得驗證數據
valid_data = tfds.load("cifar10", split=valid_split)
# 取得測試數據
test_data = tfds.load("cifar10", split=tfds.Split.TEST)

### Dataset 設定

In [9]:
AUTOTUNE = tf.data.experimental.AUTOTUNE  # 自動調整模式
batch_size = 64  # 批次大小
train_num = int(info.splits['train'].num_examples / 10) * 9  # 訓練資料數量

train_data = train_data.shuffle(train_num)  # 打散資料集
# 載入預處理「 parse_aug_fn」function，cpu數量為自動調整模式
train_data = train_data.map(map_func=parse_aug_fn, num_parallel_calls=AUTOTUNE)
# 設定批次大小並將prefetch模式開啟(暫存空間為自動調整模式)
train_data = train_data.batch(batch_size).prefetch(buffer_size=AUTOTUNE)

# 載入預處理「 parse_fn」function，cpu數量為自動調整模式
valid_data = valid_data.map(map_func=parse_fn, num_parallel_calls=AUTOTUNE)
# 設定批次大小並將prefetch模式開啟(暫存空間為自動調整模式)
valid_data = valid_data.batch(batch_size).prefetch(buffer_size=AUTOTUNE)

# 載入預處理「 parse_fn」function，cpu數量為自動調整模式
test_data = test_data.map(map_func=parse_fn, num_parallel_calls=AUTOTUNE)
# 設定批次大小並將prefetch模式開啟(暫存空間為自動調整模式)
test_data = test_data.batch(batch_size).prefetch(buffer_size=AUTOTUNE)

### 2. 使用客自化API訓練網路模型

- Custom Layer：將原本的Conv2d改成CustomConv2D。
- Custom Loss：將原本的CategoricalCrossentropy改成custom_loss。
- Custom Metrics：加入一個新的指標函數CatgoricalTruePositives。
- Custom Callbacks：紀錄每一個batch的Loss值變化。

In [10]:
inputs = keras.Input(shape=(32, 32, 3))
x = CustomConv2D(64, (3, 3))(inputs)
x = layers.MaxPool2D()(x)
x = CustomConv2D(128, (3, 3))(x)
x = CustomConv2D(256, (3, 3))(x)
x = CustomConv2D(128, (3, 3))(x)
x = CustomConv2D(64, (3, 3))(x)
x = layers.Flatten()(x)
x = CustomDense(64, activation='relu')(x)
outputs = CustomDense(10)(x)
# 建立網路模型(將輸入到輸出所有經過的網路層連接起來)
model = keras.Model(inputs, outputs, name='model')
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 32, 32, 3)]       0         
_________________________________________________________________
custom_conv2d (CustomConv2D) (None, 30, 30, 64)        1792      
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 15, 15, 64)        0         
_________________________________________________________________
custom_conv2d_1 (CustomConv2 (None, 13, 13, 128)       73856     
_________________________________________________________________
custom_conv2d_2 (CustomConv2 (None, 11, 11, 256)       295168    
_________________________________________________________________
custom_conv2d_3 (CustomConv2 (None, 9, 9, 128)         295040    
_________________________________________________________________
custom_conv2d_4 (CustomConv2 (None, 7, 7, 64)          73792 

In [11]:
# 儲存訓練記錄檔
logs_dirs = 'lab6-logs'
model_cbk = keras.callbacks.TensorBoard(log_dir='lab6-logs')

# 紀錄每一個batch的Loss值變化
model_dirs = logs_dirs + '/models'
os.makedirs(model_dirs, exist_ok=True)
custom_save_model = SaveModel(model_dirs + '/custom_save.h5', 
                              monitor='val_custom_catrgorical_accuracy', 
                              mode='max', 
                              save_weights_only=True)

In [12]:
# 設定訓練使用的優化器、損失函數和指標函數
model.compile(keras.optimizers.Adam(), 
           loss=custom_categorical_crossentropy, 
           metrics=[CustomCategoricalAccuracy()])

# 訓練網路模型
model.fit(train_data,
            epochs=20, 
            validation_data=valid_data,
            callbacks=[model_cbk, custom_save_model])





Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<tensorflow.python.keras.callbacks.History at 0x12bcbaaf088>

驗證網路模型：

In [13]:
model.load_weights(model_dirs+'/custom_save.h5')
loss, acc = model.evaluate(test_data)
print("Loss: {}, Accuracy: {}".format(loss, acc))

Loss: 0.751505434513092, Accuracy: 0.7445999979972839
