# FashionMNIST Challenge 之 殘差卷積神經網絡

## 一、算法原理

## 二、算法實現

In [None]:
import keras
import tensorflow as tf
from PIL import Image
from keras.models import model_from_json, load_model
from keras.models import Sequential
from keras.layers import Dense, Activation
import numpy as np
from keras.layers import Conv2D, BatchNormalization, MaxPooling2D, Input
from keras.layers import AveragePooling2D, Flatten, GlobalMaxPooling2D
from keras import layers
from keras.models import Model

def conv_block(input_tensor, bn_axis, filters, phase, name, strides=(1, 1)):
    """
    Conv2D 塊，雙路雙卷積計算
    :param input_tensor:(tensor) 輸入張量
    :param filters:(tuple) 卷積核打包
    :param strides:(int) 卷積步長
    :param BN_axis:(int) 規範化卷積軸
    :return: model
    """
    filters1, filters2, filters3 = filters  # 解包卷積核數量
    Conv_base_name = 'Conv_' + name + '_' + str(phase) + '_phase_'
    BN_base_name = 'BN_' + name + '_' + str(phase) + '_phase_'
    x = Conv2D(
        filters=filters1, kernel_size=(1, 1), strides=strides, name=Conv_base_name + '2a'
    )(input_tensor)
    x = BatchNormalization(axis=bn_axis, name=BN_base_name + '2a')(x)
    x = Activation(activation='relu')(x)

    x = Conv2D(
        filters=filters2, kernel_size=(1, 1), strides=strides, name=Conv_base_name + '2b'
    )(x)
    x = BatchNormalization(axis=bn_axis, name=BN_base_name + '2b')(x)
    x = Activation(activation='relu')(x)

    x = Conv2D(
        filters=filters3, kernel_size=(1, 1), strides=strides, name=Conv_base_name + '2c'
    )(x)
    x = BatchNormalization(axis=bn_axis, name=BN_base_name + '2c')(x)
    x = Activation(activation='relu')(x)

    y = Conv2D(filters3, (1, 1), strides=strides, name=Conv_base_name + '1a')(input_tensor)
    y = BatchNormalization(axis=bn_axis, name=BN_base_name + '1b')(y)

    x = layers.add([x, y])
    a = Activation('relu')(x)

    return a


def identity_block(input_tensor, bn_axis, filters, phase, name, strides=(1, 1)):
    """
    Conv2D 塊，雙路單卷積計算
    :param input_tensor:(tensor) 輸入張量
    :param filters:(tuple) 卷積核打包
    :param strides:(int) 卷積步長
    :param BN_axis:(int) 規範化卷積軸
    :return: model
    """
    filters1, filters2, filters3 = filters  # 解包卷積核數量
    Conv_base_name = 'Conv_' + name + '_' + str(phase) + '_phase_'
    BN_base_name = 'BN_' + name + '_' + str(phase) + '_phase_'
    x = Conv2D(
        filters=filters1, kernel_size=(1, 1), strides=strides, name=Conv_base_name + '2a'
    )(input_tensor)
    x = BatchNormalization(axis=bn_axis, name=BN_base_name + '2a')(x)
    x = Activation(activation='relu')(x)

    x = Conv2D(
        filters=filters2, kernel_size=(1, 1), strides=strides, name=Conv_base_name + '2b'
    )(x)
    x = BatchNormalization(axis=bn_axis, name=BN_base_name + '2b')(x)
    x = Activation(activation='relu')(x)

    x = Conv2D(
        filters=filters3, kernel_size=(1, 1), strides=strides, name=Conv_base_name + '2c'
    )(x)
    x = BatchNormalization(axis=bn_axis, name=BN_base_name + '2c')(x)
    x = Activation(activation='relu')(x)

    x = layers.add([x, input_tensor])
    a = Activation('relu')(x)

    return a


def my_resnet():
    inputs = Input(shape=(1, 28, 28))

    x = Conv2D(
        filters=4, kernel_size=(3, 3), padding='same', name='Conv1', data_format='channels_first')(inputs)
    x = BatchNormalization(axis=1, name='BN_Conv1')(x)
    x = Activation('relu')(x)
    MaxPooling2D(pool_size=(2, 2), strides=(2, 2), data_format='channels_first')(x)

    x = conv_block(input_tensor=x, bn_axis=1, filters=(4, 4, 64), phase=2, name='a')
    x = identity_block(input_tensor=x, bn_axis=1, filters=(4, 4, 64), phase=2, name='b')
    x = identity_block(input_tensor=x, bn_axis=1, filters=(4, 4, 64), phase=2, name='c')

    x = AveragePooling2D((2, 2), name='avg_pool')(x)
    x = Flatten()(x)
    x = Dense(10, activation='softmax', name='softmax10')(x)
    # x = GlobalMaxPooling2D()(x)

    model = Model(inputs, x, name='My_Resnet')
    return model


def create_model():
    """返回一個已創建好的 resnet model"""
    # model = keras.applications.resnet50.ResNet50(include_top=True, weights=None,
    #                                              input_tensor=None, input_shape=(224, 224, 3),
    #                                              pooling='max',
    #                                              classes=10)
    model = my_resnet()
    model.compile(optimizer='rmsprop',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    return model


In [None]:
def read_tfrecord(filename, tensor=[1, 784], num=5000):
    """
    讀取tfrecord文件數據
    :param filename: 文件名
    :param tensor: 緯度
    :param num: 讀取數據數量
    :return: images 圖片數據列表    labels(numpy) 標籤列表    maxlabels(int) 標籤所在項
    """
    filename_queue = tf.train.string_input_producer([filename])
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    features = tf.parse_single_example(serialized_example,
                                       features={
                                           'label': tf.FixedLenFeature([], tf.int64),
                                           'img_val': tf.FixedLenFeature(tensor, tf.float32),
                                       })
    image = tf.cast(features['img_val'], tf.float64)
    label = tf.cast(features['label'], tf.int32)
    images = []
    labels = []
    maxlabels = []
    with tf.Session() as sess:
        init_op = tf.global_variables_initializer()
        sess.run(init_op)
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(coord=coord)
        for i in range(num):
            example, l = sess.run([image, label])
            arr = np.array(example[0])
            arr = arr.reshape((28, 28))
            images.append([arr])
            tem = np.zeros((1, 10))
            tem[0][l] = 1.0
            labels.append(tem[0])
            maxlabels.append([l])
            del tem
        coord.request_stop()
        coord.join(threads=threads)
    images = np.array(images)
    labels = np.array(labels)
    return images, labels, maxlabels


def trun(arr):
    out_arr = []
    for img in arr:
        img = Image.fromarray(img.reshape([28, 28])).resize([224, 224])
        img = np.asarray(img)
        im = []
        for a in img:
            row = []
            for b in a:
                row.append([b, b + 0.0, b + 0.0])
            im.append(row)
            del row
        out_arr.append(im)
        del im
    return out_arr


def getdata(train=55000, test=5000):
    trimages, trlabels, _ = read_tfrecord('train.tfrecords', num=train, )
    teimages, telabels, _ = read_tfrecord('test.tfrecords', num=test, )
    # trimages = trun(trimages)
    # teimages = trun(teimages)

    return trimages, trlabels, teimages, telabels


def main(train=55000, test=5000, batch_size=100, epochs=20):
    trimages, trlabels, tsimages, tslabels = getdata(train=train, test=test)
    # print(trlabels)
    model = create_model()
#     from keras.utils import plot_model
#     plot_model(model, to_file='./Resnet_model.png')
    tem = model.fit(trimages, trlabels, batch_size=batch_size, epochs=epochs, )
    paint(tem.history)

    model.evaluate(tsimages, tslabels, batch_size=batch_size)
    model.save('./resnet.h5')


def paint(hist):
    import matplotlib.pyplot as plt
    %matplotlib notebook
    # fig = plt.figure(figsize=(6, 3))
    ti = []
    for i in range(len(hist['loss'])):
        ti.append(i)
    plt.plot(ti, hist['loss'], c='red')
    plt.plot(ti, hist['acc'], c='blue')
    plt.show()


def test_model(num=1000.0):
    timages, tlabels, _ = read_tfrecord('test.tfrecords', num=int(num))

    model = load_model('./resnet50.h5')
    pre = model.predict(timages)
    tr = 0
    for a, b in zip(tlabels, pre):
        a = np.argmax(a)
        b = np.argmax(b)
        if a == b:
            tr += 1
    ls = tr / num
    print("正確率為：{0}".format(ls))


def train_model():
    from tensorflow.examples.tutorials.mnist import input_data
    data = input_data.read_data_sets('./data/', validation_size=5000)
    model = load_model('./resnet.h5')
    model.fit()


def test():
    from tensorflow.examples.tutorials.mnist import input_data
    data = input_data.read_data_sets('./data/', validation_size=5000)
    data = data.train.next_batch(5000)
    img = []
    label = []
    for i in data[0]:
        i = np.reshape(i, [28, 28])
        img.append(i)
    print("轉換完成")
    for i in data[1]:
        tem = np.zeros((1, 10))
        tem[0][i] = 1.0
        label.append(tem[0])
    print('轉換完成')
    model = create_model()
    model.fit(img, label, batch_size=50, epochs=5, )
    model.evaluate(img, label, batch_size=50)
    model.save('./resnet_test.h5')


In [None]:
if __name__ == '__main__':
    main()

    # test()
    # test_model(num=5000)
    # x_train = np.random.random((2,5,5))
    # print(x_train)

## 三、實踐總結