# 建立VGG16網路模型來辨識貓狗數據集

1. 載入資料

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
# 數據讀取
TrainDataGenerator = ImageDataGenerator()
traindata = TrainDataGenerator.flow_from_directory(
                    directory="Cats&Dogs/train",target_size=(224,224))
TestDataGenerator = ImageDataGenerator()
testdata = TestDataGenerator.flow_from_directory(
                    directory="Cats&Dogs/test", target_size=(224,224))

2. 模型建立

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D , Flatten
model = Sequential([
    #　第一組 :兩個 3*3*64 卷積核 + 一個最大池化層
    Conv2D(input_shape=(224,224,3),filters=64,kernel_size=(3,3),padding="same",
           activation="relu"),
    Conv2D(filters=64,kernel_size=(3,3),padding="same", activation="relu"),
    MaxPool2D(pool_size=(2,2),strides=(2,2)),
    #　第二組 : 兩個3*3*128卷積核 + 一個最大池化層
    Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"),
    Conv2D(filters=128, kernel_size=(3,3), padding="same", activation="relu"),
    MaxPool2D(pool_size=(2,2),strides=(2,2)),
    #　第三組 : 三個3*3*56卷積核 + 一個最大池化層
    Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"),
    Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"),
    Conv2D(filters=256, kernel_size=(3,3), padding="same", activation="relu"),
    MaxPool2D(pool_size=(2,2),strides=(2,2)),
    #　第四組 : 三個3*3*512卷積核 + 一個最大池化層
    Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    MaxPool2D(pool_size=(2,2),strides=(2,2)),
    #　第五組 : 三個3*3*512卷積核 + 一個最大池化層
    Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    Conv2D(filters=512, kernel_size=(3,3), padding="same", activation="relu"),
    MaxPool2D(pool_size=(2,2),strides=(2,2)),
    # 三個全連接層Dense，最後一層用於預測分類。
    Flatten(),
    Dense(units=4096,activation="relu"),
    Dense(units=4096,activation="relu"),
    Dense(units=2, activation="softmax")
])
model.summary()

3. 編譯模型

In [None]:
# 編譯模型, 定義模型優化器， 使用分類交叉熵損失
from tensorflow.keras.optimizers import Adam
import tensorflow.keras
model.compile(optimizer=Adam(lr=0.00001),
              loss = tensorflow.keras.losses.categorical_crossentropy,
              metrics=['accuracy'])

4. 設定模型儲存條件與提早停止訓練條件

In [None]:
# 設定監控方法與條件
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
# 模型儲存名稱為 vgg16.h5, 監控的評估參數為 val_accuracy
checkpoint = ModelCheckpoint("vgg16.h5", monitor='val_accuracy', verbose=1,
                          save_best_only=True,save_weights_only=False,
                          mode='auto', save_freq=1)
earlystop = EarlyStopping(monitor='val_accuracy', min_delta=0,
                          patience=20, verbose=1, mode='auto')

5. 開始訓練

In [None]:
# 訓練模型並呼叫回調函數
history = model.fit_generator(steps_per_epoch=100,generator=traindata,
                              validation_data= testdata,
                              validation_steps=10,epochs=50,
                              callbacks=[checkpoint,earlystop])

6. 最後我們將程式執行到最後的結果擷取出，發現該模型的正確率約為96%。

In [None]:
import matplotlib.pyplot as plt
plt.plot(history.history["accuracy"])
plt.plot(history.history['val_accuracy'])
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title("Model Accuracy")
plt.ylabel("Accuracy")
plt.xlabel("Epoch")
plt.legend(["Accuracy","Validation Accuracy","loss","Validation Loss"])
plt.show(block=True)

print(history.history.keys())

# 載入模型與資料預測

In [None]:
from tensorflow.keras.preprocessing import image
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model

# 載入影像後做一個尺度大小設定
img = image.load_img("cat001.jpg",target_size=(224,224))
img = np.asarray(img)
plt.imshow(img)
img = np.expand_dims(img, axis=0)
plt.show(block=True)
# 載入模型
saved_model = load_model("vgg16.h5")
# 模型預測
output = saved_model.predict(img)
if output[0][0] > output[0][1]:
    print("cat")
else:
    print('dog')

# Keras fit_generator 函數

In [None]:
from keras.preprocessing.image import ImageDataGenerator
EPOCHS = 100
BS = 32

aug = IMageDataGenerator(rotation_range=20,zoom_range=0.15,width_shift_range=0.2,
                         height_shift_range=0.2, shear_range=0.15, horizontal_flip=True, fill_mode="nearest")
history = model.fit_generator(aug.flow(trainX, trainY, batch_size=BS), validation_data=(testX, testY),
                              steps_per_epoch = len(trainX)//BS, epochs = EPOCHS)

# 3

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
# 使用numpy生成100個隨機點
np.random.seed(5)
x_data = np.random.rand(100)
noise = np.random.mormal(0,0.01,x_data.shape)
y_data = x_data*0.1+0.2+noise   # 加入噪聲，便於線性回歸
# 顯示隨即點
plt.scatter(x_data,y_data)
plt.show()

In [None]:
# 建立模型，並加入一個全連接層
model = Sequential()
model.add(Dense(units=1, input_dim=1))
model.compile(optimizer='sgd', loss='mse')

def generator(x_data, y_data, size):
    while True:
        for i in range(size):
            x = x_data[i*size:(i+1)*size-1]
            y = y_data[i*size:(i+1)*size-1]
            yield x,y

# 以下使用了fit, fit_generator與train_on_batch
# model.fit(x_data, y_data, epochs=1000, batch_size=10)
# model.fit_generator(generator(x_data, y_data, 10), steps_per_epoch=10, epochs=1000, verbose=1)
for step in range(10001):
    cost = model.train_on_batch(x_data, y_data)
    if step%5 == 0:
        print('cost', cost)
w,b = model.layers[0].get_weights()
print('w:',w,'b:',b)

# 輸入x，輸出預測值y
y_pred = model.predict(x_data)
# 顯示隨機點
plt.scatter(x_data, y_data)
# 顯示預測結果
plt.plot(x_data, y_pred, 'r-', lw=3)
plt.show()

# GoogLeNet

In [None]:
from tensorflow.keras import layers, Model, models, Sequential

class Inception(layers.Layer):
    def __init__(self, ch1x1, ch3x3red, ch3x3, ch5x5red, ch5x5, pool_proj, **kwargs):
        super(Inception, self).__init__(**kwargs)
        # 定義四個分支
        # 第一個分支為 1x1 的卷積
        self.branch1 = layers.Conv2D(ch1x1, kernel_size=1, activation="relu")
        # 第二個分支為 1x1 的卷積 + 3x3的卷積 (輸出的大小 = 輸入的大小)
        self.branch2 = Sequential([
            layers.Conv2D(ch3x3red, kernel_size=1, activation="relu"),
            layers.Conv2D(ch3x3, kernel_size=3, padding="SAME", activation="relu")])     
        # 第三個分支為 1x1 的卷積 + 5x5的卷積 (輸出的大小 = 輸入的大小)
        self.branch3 = Sequential([
            layers.Conv2D(ch5x5red, kernel_size=1, activation="relu"),
            layers.Conv2D(ch5x5, kernel_size=5, padding="SAME", activation="relu")])      
        # 第四個分支為 3x3 的最大池化 + 1x1的卷積  (輸出的大小 = 輸入的大小)
        self.branch4 = Sequential([
            layers.MaxPool2D(pool_size=3, strides=1, padding="SAME"),  
            layers.Conv2D(pool_proj, kernel_size=1, activation="relu")])                  

    def call(self, inputs, **kwargs):
        branch1 = self.branch1(inputs)   # 輸入的特徵圖分別進入分支 1 至 4
        branch2 = self.branch2(inputs)
        branch3 = self.branch3(inputs)
        branch4 = self.branch4(inputs)
        outputs = layers.concatenate([branch1, branch2, branch3, branch4])
        return outputs

輔助分類器

In [None]:
# 輔助分類器
class InceptionAux(layers.Layer):
    def __init__(self, num_classes, **kwargs):
        super(InceptionAux, self).__init__(**kwargs)
        # 第一層 :平均池化層
        self.averagePool = layers.AvgPool2D(pool_size=5, strides=3)
        # 第二層 :1x1卷積層
        self.conv = layers.Conv2D(128, kernel_size=1, activation="relu")
        # 第三層 :全連接層
        self.fc1 = layers.Dense(1024, activation="relu")
        # 第四層 :全連接層, 節點個數是非類數目
        self.fc2 = layers.Dense(num_classes)
        # 第五層 :softmax
        self.softmax = layers.Softmax()

    def call(self, inputs, **kwargs):
        x = self.averagePool(inputs)
        x = self.conv(x)
        x = layers.Flatten()(x)
        x = layers.Dropout(rate=0.5)(x)
        x = self.fc1(x)
        x = layers.Dropout(rate=0.5)(x)
        x = self.fc2(x)
        x = self.softmax(x)

        return x

第一、二模塊

In [None]:
def GoogLeNet(im_height=224, im_width=224, class_num=1000, aux_logits=False):
    # tensorflow中的tensor通道排序是NHWC
    input_image = layers.Input(shape=(im_height, im_width, 3), dtype="float32")
    # (1)第一模塊 :
    # (None, 224, 224, 3)
    x = layers.Conv2D(64, kernel_size=7, strides=2, padding="SAME", activation="relu", name="conv2d_1")(input_image)
    # (None, 112, 112, 64)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_1")(x)
    # (2)第二模塊 :
    # (None, 56, 56, 64)
    x = layers.Conv2D(64, kernel_size=1, activation="relu", name="conv2d_2")(x)
    # (None, 56, 56, 64)
    x = layers.Conv2D(192, kernel_size=3, padding="SAME", activation="relu", name="conv2d_3")(x)
    # (None, 56, 56, 192)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_2")(x)

第三模塊(Inception 3a、3b層)

In [None]:
    # (None, 28, 28, 192)
    x = Inception(64, 96, 128, 16, 32, 32, name="inception_3a")(x)
    # (None, 28, 28, 256)
    x = Inception(128, 128, 192, 32, 96, 64, name="inception_3b")(x)
    # (None, 28, 28, 480)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_3")(x)

第四模塊(Inception 4a、4b、4c、4d、4e層)

In [None]:
    # (None, 14, 14, 480)
    x = Inception(192, 96, 208, 16, 48, 64, name="inception_4a")(x)
    if aux_logits:
        aux1 = InceptionAux(class_num, name="aux_1")(x)

    # (None, 14, 14, 512)
    x = Inception(160, 112, 224, 24, 64, 64, name="inception_4b")(x)
    # (None, 14, 14, 512)
    x = Inception(128, 128, 256, 24, 64, 64, name="inception_4c")(x)
    # (None, 14, 14, 512)
    x = Inception(112, 144, 288, 32, 64, 64, name="inception_4d")(x)
    if aux_logits:
        aux2 = InceptionAux(class_num, name="aux_2")(x)

    # (None, 14, 14, 528)
    x = Inception(256, 160, 320, 32, 128, 128, name="inception_4e")(x)
    # (None, 14, 14, 532)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding="SAME", name="maxpool_4")(x)

第五模塊與最後輸出

In [None]:
    # (None, 7, 7, 832)
    x = Inception(256, 160, 320, 32, 128, 128, name="inception_5a")(x)
    # (None, 7, 7, 832)
    x = Inception(384, 192, 384, 48, 128, 128, name="inception_5b")(x)
    # (None, 7, 7, 1024)
    x = layers.AvgPool2D(pool_size=7, strides=1, name="avgpool_1")(x)
    
    # (None, 1, 1, 1024)
    x = layers.Flatten(name="output_flatten")(x)
    # (None, 1024)
    x = layers.Dropout(rate=0.4, name="output_dropout")(x)
    x = layers.Dense(class_num, name="output_dense")(x)
    # (None, class_num)
    aux3 = layers.Softmax(name="aux_3")(x)

    if aux_logits:
        model = models.Model(inputs=input_image, outputs=[aux1, aux2, aux3])
    else:
        model = models.Model(inputs=input_image, outputs=aux3)
    return model

產生模型實體與定義模型輸入大小與輸出類別大小

In [None]:
im_height = 224
im_width = 224
batch_size = 32
model = GoogLeNet(im_height=im_height, im_width=im_width, class_num=10, aux_logits=True)
model.summary()

定義模型使用的優化函數與損失函數

In [None]:
import tensorflow as tf
# 使用 keras 所提供的優化函數與損失函數
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0003)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

val_loss = tf.keras.metrics.Mean(name='val_loss')
val_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='val_accuracy')

定義訓練階段函數與驗證階段函數

In [None]:
# 定義訓練階段函數
@tf.function
def train_step(images, labels):
    with tf.GradientTape() as tape:
        aux1, aux2, output = model(images, training=True)
        loss1 = loss_object(labels, aux1)
        loss2 = loss_object(labels, aux2)
        loss3 = loss_object(labels, output)
        loss = loss1 * 0.3 + loss2 * 0.3 + loss3
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # 計算訓練的效能衡量指標
    train_loss(loss)
    train_accuracy(labels, output)
# 定義驗證階段函數   
@tf.function
def val_step(images, labels):
    _, _, output = model(images, training=False)
    loss = loss_object(labels, output)
    val_loss(loss)
    val_accuracy(labels, output)

載入資料集並進行前處理

In [None]:
from tensorflow.keras.datasets import cifar10
import tensorflow as tf
# 載入 cifar10 資料集 50000張訓練資料 , 10000張測試資料, 每張大小為 32x32,3通道
(train_Data, train_Label), (test_Data, test_Label) = cifar10.load_data()

# 資料切割, 訓練資料的前面 5000 筆當作是驗證集, 剩下的為測試集
validation_data, validation_label = train_Data[:5000],train_Label[:5000]
train_Data,train_Label= train_Data[5000:],train_Label[5000:]

batchs = 32
resize = 224

train_ds = tf.data.Dataset.from_tensor_slices((train_Data,train_Label))
test_ds = tf.data.Dataset.from_tensor_slices((test_Data, test_Label))
validation_ds = tf.data.Dataset.from_tensor_slices((validation_data,
                                                    validation_label))

def preprocess(image, label):
    image = tf.image.per_image_standardization(image)
    image = tf.image.resize(image,(resize,resize))
    return image,label

batch_size = 360
train_ds = train_ds.map(preprocess).shuffle(1000).batch(batch_size=batch_size)
# 驗證集與測試集資料不用打散
validation_ds = validation_ds.map(preprocess).batch(batch_size=batch_size)
test_ds = test_ds.map(preprocess).batch(batch_size=batch_size)

開始進行訓練，並在每一個 epoch 結束後進行驗證集驗證

In [None]:
epochs = 10
for epoch in range(epochs):
    train_loss.reset_states()      # clear history info
    train_accuracy.reset_states()  # clear history info
    val_loss.reset_states()        # clear history info
    val_accuracy.reset_states()    # clear history info
    print("epoch = ",epoch + 1)
    for step,(batch_data,batch_label) in enumerate(train_ds):
        train_step(batch_data,batch_label)
        print("train_accuracy = ",train_accuracy.result())
        print("train_loss = ",train_loss.result())
    
    for step,(batch_data,batch_label) in enumerate(test_ds):
        val_step(batch_data, batch_label)
    # 每一次 epoch 印出驗證集的正確率與損失率
    print("val_accuracy = ",val_accuracy.result())
    print("val_loss = ",val_loss.result())

# BasicBlock

In [None]:
import tensorflow as tf
from tensorflow.keras import layers,Sequential

class BasicBlock(layers.Layer):
    # 定義殘差模塊類別
    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()
        # f(x)包含了 2 個普通卷積層，創建卷積層 1 =>(3x3),64
        self.conv1 = layers.Conv2D(filter_num, (3, 3),
                                   strides=stride, padding = 'same')
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')
        # 創建卷積層 2  =>(3x3),64
        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1,
                                   padding='same')
        self.bn2 = layers.BatchNormalization()
        if stride != 1:  # 插入 identity 層 (stride!=1需要下採樣)
            self.downsample = Sequential()
            self.downsample.add(layers.Conv2D(filter_num, (1, 1),
                                              strides=stride))
        else:  # 否則，直接連接
            self.downsample = lambda x: x

In [None]:
    def call(self, inputs, training=None):
        # 前向計算
        out = self.conv1(inputs) # 通過第一個卷積層
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out) # 通過第二個卷積層
        out = self.bn2(out)
        # inputs 通過 identity()轉換
        identity = self.downsample(inputs)
        # f(x)+x 運算
        output = layers.add([out, identity])
        # 再通過relu激勵函數計算並傳回
        output = tf.nn.relu(output)
        return output     

利用堆疊方式來擴大ResBlock模塊

In [None]:
    # 製作多個殘差模塊的堆疊
    def build_resblock(self, filter_num, blocks, stride=1):

        Resblock = Sequential()
        # 堆疊的第一個 BasicBlock 步長不會是 1, 所以進行下採樣
        Resblock.add(BasicBlock(filter_num, stride))

        for _ in range(1, blocks):  # 其他 BasicBlock 步長都為1
            # 這裏stride設置爲1，只會在第一個Basic Block做一個下采樣。
            Resblock.add(BasicBlock(filter_num, stride=1))
        return Resblock            

利用build_resblock完成Resnet18網路

In [None]:
from tensorflow import keras
# 設定 ResBlock 模塊數目內部通道數。
class ResNet(keras.Model):
    # 第一個參數 layer_dims：[2, 2, 2, 2] 共 4個 Res Block，
    # 每個包含2個Basic Block
    # 第二個參數 num_classes：設定全連接輸出個數，這邊是指輸出有多少類。
    def __init__(self, layer_dims, num_classes=10):
        super(ResNet, self).__init__()

        # 預處理層；可以藉由此層來設定一開始的通道數與欲輸入的特徵圖大小
        self.Prelayer = Sequential([
            layers.Conv2D(64, (3, 3), strides=(1, 1)),
            layers.BatchNormalization(),
            layers.Activation('relu'),
            layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1),
                             padding='same')
        ])
				# 創建4個Res Block
        self.layer1 = self.build_resblock(64, layer_dims[0])
        self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
        self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
        self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
        # 通過 Pooling 層將寬與高降低為1x1
        self.avgpool = layers.GlobalAveragePooling2D()
        # 最後連接一個全連接層分類
        self.fc = layers.Dense(num_classes,activation = 'softmax')
		
		# 完成Resnet前向計算流程
    def call(self,inputs, training=None, **kwargs):
        # 完成前向運算過程
        x = self.Prelayer(inputs)
        # 前項計算通過四個 resblock 模塊
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # shape爲 [batchsize, channel]
        x = self.avgpool(x)
        # [b, 10]
        x = self.fc(x)
        return x

創建ResNet18 網路

In [None]:
def resnet18():
    # 通過調整內部模塊個數可以完成不同的 resnet 網路
    return ResNet([2, 2, 2, 2])

# ResNet18

加載CIFAR10數據集

In [None]:
import tensorflow as tf
from CH12_BasicBlock import resnet18
from tensorflow.keras.datasets import cifar10
# 載入 cifar10 資料集 50000張訓練資料 , 10000張測試資料, 每張大小為 32x32,3通道
(train_Data, train_Label), (test_Data, test_Label) = cifar10.load_data()

# 將多餘的維度刪除
train_Label = tf.squeeze(train_Label, axis=1)
test_Label = tf.squeeze(test_Label, axis=1)
# 新增驗證集, 將訓練集資料的前 5000比當作驗證集
validation_data, validation_label = train_Data[:5000],train_Label[:5000]
Newtrain_Data,Newtrain_Label= train_Data[5000:],train_Label[5000:]

train_ds = tf.data.Dataset.from_tensor_slices((Newtrain_Data, Newtrain_Label))
test_ds = tf.data.Dataset.from_tensor_slices((test_Data, test_Label))
validation_ds = tf.data.Dataset.from_tensor_slices((validation_data,
                                                    validation_label))

資料前處理與訓練資料打散

In [None]:
def preprocess(image, label):
    image = tf.cast(image, dtype=tf.float32) / 255.
    label = tf.cast(label, dtype=tf.int32)
    return image,label

batch_size = 256
epoch = 30
train_ds = train_ds.map(preprocess).shuffle(500).batch(batch_size=batch_size)
# 驗證集與測試集資料不用打散
test_ds = test_ds.map(preprocess).batch(batch_size=batch_size)
validation_ds = validation_ds.map(preprocess).batch(batch_size=batch_size)

網路編譯與訓練

In [None]:
model = resnet18()
model.build(input_shape=(None, 32, 32, 3))

model.compile(optimizer= 'adam',loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
History = model.fit(train_ds,epochs=epoch,validation_data=validation_ds,validation_freq=1)