In [1]:
import tensorflow as tf
import numpy as np
from datetime import datetime
import os

num_epochs = 100
batch_size = 128
learning_rate = 0.001
val_l2 = 0.01
data_dir = './data/fer2013/'
train_dir = data_dir + 'train/'
test_dir = data_dir + 'valid/'

checkpoint_path = "training_fer2013_Google_plus_ResNet_2/cp-{epoch:04d}.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)
log_dir = "./tensorboard/fer2013_Google_plus_ResNet_" + datetime.now().strftime("%Y%m%d-%H%M%S")

class BasicBlock(tf.keras.layers.Layer):
# 定义BasicBlock模块，由两个3*3卷积层组成的基本模块
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, use_bias=False):
        super(BasicBlock, self).__init__()

        self.conv1 = tf.keras.layers.Conv2D(out_channels,
                                            kernel_size=3,
                                            strides=stride,
                                            padding="same",
                                            use_bias=use_bias,
                                            kernel_regularizer=tf.keras.regularizers.l2(l=val_l2),
                                            bias_regularizer=tf.keras.regularizers.l2(l=val_l2))
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.conv2 = tf.keras.layers.Conv2D(out_channels,
                                            kernel_size=3,
                                            strides=1,
                                            padding="same",
                                            use_bias=use_bias,
                                            kernel_regularizer=tf.keras.regularizers.l2(l=val_l2),
                                            bias_regularizer=tf.keras.regularizers.l2(l=val_l2))
        self.bn2 = tf.keras.layers.BatchNormalization()

        # 判断stride是否等于1,如果为1就是没有降采样。
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = tf.keras.Sequential([tf.keras.layers.Conv2D(self.expansion * out_channels, kernel_size=1, strides=stride, use_bias=use_bias),
                                        tf.keras.layers.BatchNormalization()])
        else:
            self.shortcut = lambda x, _: x


    def call(self, inputs, training=False):
        out = self.conv1(inputs)
        out = self.bn1(out, training=training)
        out = tf.nn.relu(out)
        out = self.conv2(out)
        out = self.bn2(out, training=training)
        out += self.shortcut(inputs, training)
        out = tf.nn.relu(out)

        return out
    
class ResNet(tf.keras.Model):
# ResBlock 模块。继承keras.Model或者keras.Layer都可以

    # 第一个参数layer_dims：[[32,2,1], [64,2,2], [128,2,2], [256,2,2]] 4个Res Block，layer_dims[:][0]表示卷积核个数，layer_dims[:][1]表示blocks数目，layer_dims[:][2]表示stride步数
    # 第二个参数num_classes：我们的全连接输出，取决于输出有多少类。
    def __init__(self, blocks, layer_dims, num_classes=10, use_bias=False):
        super(ResNet, self).__init__()
        
        #检查参数
        if(self.check_param(blocks, layer_dims, num_classes, use_bias) == False):
            return None
        
        self.in_channels = layer_dims[0][0]
        self.use_bias = use_bias

        # 0. 预处理卷积层；实现比较灵活可以加MAXPool2D，或者不加，这里没加。注意这里的channels需要和layer1的channels是一样的，不然能add。
        self.stem = tf.keras.Sequential([tf.keras.layers.Conv2D(self.in_channels, 3, 2, padding="same", use_bias=use_bias,
                                            kernel_regularizer=tf.keras.regularizers.l2(l=val_l2),
                                            bias_regularizer=tf.keras.regularizers.l2(l=val_l2)),
                                        tf.keras.layers.BatchNormalization()])
        #self.pool = tf.keras.layers.MaxPool2D(pool_size=[3, 3], strides=2, padding='same')

        # 1. 创建4个ResBlock
        self.layer1 = self.build_resblock(blocks, out_channels = layer_dims[0][0], num_blocks = layer_dims[0][1], stride = layer_dims[0][2])
        self.layer2 = self.build_resblock(blocks, out_channels = layer_dims[1][0], num_blocks = layer_dims[1][1], stride = layer_dims[1][2])
        self.layer3 = self.build_resblock(blocks, out_channels = layer_dims[2][0], num_blocks = layer_dims[2][1], stride = layer_dims[2][2])
        self.layer4 = self.build_resblock(blocks, out_channels = layer_dims[3][0], num_blocks = layer_dims[3][1], stride = layer_dims[3][2])
        
        self.final_bn  = tf.keras.layers.BatchNormalization()

        self.avgpool = tf.keras.layers.GlobalAveragePooling2D()
        #self.dense = tf.keras.layers.Dense(num_classes, activation=tf.nn.softmax)


    # 2. 创建ResBlock
    def build_resblock(self, blocks, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        res_blocks = tf.keras.Sequential()
        for stride in strides:
            res_blocks.add(blocks(self.in_channels, out_channels, stride, self.use_bias))
            self.in_channels = out_channels

        return res_blocks


    def call(self, inputs, training=False):
        # __init__中准备工作完毕；下面完成前向运算过程。
        out = self.stem(inputs, training)             # (48, 48, 16)
        out = tf.nn.relu(out)
        #out = self.pool(out)

        out = self.layer1(out, training=training)     # (24, 24, 16)
        out = self.layer2(out, training=training)     # (12, 12, 32)
        out = self.layer3(out, training=training)     # (6, 6, 64)
        out = self.layer4(out, training=training)     # (3, 3, 128)

        out = self.final_bn(out, training=training)

        out = tf.nn.relu(out)
        out = self.avgpool(out)                       # (1, 1, 128)
        #out = self.dense(out)

        return out

    def check_param(self, blocks, layer_dims, num_classes, use_bias):
        returntype = False
        if(blocks != BasicBlock and blocks != Bottleneck):
            print('Parameter Error: blocks must be BasicBlock or Bottleneck.')
        elif(np.shape(layer_dims) != (4, 3)):
            print('Parameter Error: layer_dims shape must be (4, 3).')
        elif(type(num_classes) != int or num_classes <= 0):
             print('Parameter Error: num_classes must be greater than  0.')
        elif(type(use_bias) != bool):
             print('Parameter Error: use_bias must be True or False.')
        else:
             returntype = True

        return returntype

class GoogleNet_BasicBlock(tf.keras.layers.Layer):
    def __init__(self, filters_1x1, filters_3x3_reduce, filters_3x3, filters_5x5_reduce, filters_5x5, filters_pool):
        super(GoogleNet_BasicBlock, self).__init__()
        
        self.conv_1x1 = self.conv2d(1, filters_1x1, 1)
        self.conv_3x3_reduce = self.conv2d(1, filters_3x3_reduce, 1)
        self.conv_3x3 = self.conv2d(3, filters_3x3, 1)
        self.conv_5x5_reduce = self.conv2d(1, filters_5x5_reduce, 1)
        self.conv_5x5 = self.conv2d(5, filters_5x5, 1)
        self.pool = tf.keras.layers.MaxPool2D(pool_size=[3, 3], strides=1, padding='same')
        self.conv_pool = self.conv2d(1, filters_pool, 1)
        self.concatenate = tf.keras.layers.Concatenate()

    def call(self, inputs, training=False):
        x1 = self.conv_1x1(inputs)
        x2 = self.conv_3x3_reduce(inputs)
        x2 = self.conv_3x3(x2)
        x3 = self.conv_5x5_reduce(inputs)
        x3 = self.conv_5x5(x3)
        x4 = self.pool(inputs)
        x4 = self.conv_pool(x4)
        output = self.concatenate([x1, x2, x3, x4])
        return output
    
    def conv2d(self, size, filters, stride):
        return tf.keras.layers.Conv2D(
            filters = filters,
            kernel_size = [size, size],
            strides = (stride, stride),
            padding = 'same',
            activation = tf.nn.relu,
            kernel_regularizer=tf.keras.regularizers.l2(l=val_l2),
            bias_regularizer=tf.keras.regularizers.l2(l=val_l2)
        )

class GoogleNet(tf.keras.Model):
    def __init__(self):
        super(GoogleNet, self).__init__()
        
        #self.conv_7x7x64s2 = self.conv2d(7, 64, 2)
        #self.pool3x3 = tf.keras.layers.MaxPool2D(pool_size=[3, 3], strides=2, padding='same')
        self.inception1 = GoogleNet_BasicBlock(16, 8, 16, 4, 8, 8)
        self.bn1 = tf.keras.layers.BatchNormalization()
        self.pool2x2_in1 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.inception2 = GoogleNet_BasicBlock(32, 16, 32, 8, 16, 16)
        self.bn2 = tf.keras.layers.BatchNormalization()
        self.pool2x2_in2 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.inception3 = GoogleNet_BasicBlock(32, 16, 32, 8, 16, 16)
        self.bn3 = tf.keras.layers.BatchNormalization()
        self.pool2x2_in3 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.inception4 = GoogleNet_BasicBlock(64, 16, 64, 16, 32, 32)
        self.bn4 = tf.keras.layers.BatchNormalization()
        self.pool2x2_in4 = tf.keras.layers.MaxPool2D(pool_size=[2, 2], strides=2)
        self.inception5 = GoogleNet_BasicBlock(64, 32, 64, 16, 32, 32)
        self.bn5 = tf.keras.layers.BatchNormalization()
        self.avgpool = tf.keras.layers.GlobalAveragePooling2D()
        
    def call(self, inputs, training=False):
        #out = self.conv_7x7x64s2(inputs)
        #out = self.pool3x3(out)
        
        out = self.inception1(inputs)          # (48, 48, 48)
        out = self.bn1(out, training=training)
        out = self.pool2x2_in1(out)            # (24, 24, 48)
        
        out = self.inception2(out)             # (24, 24, 96)
        out = self.bn2(out, training=training)
        out = self.pool2x2_in2(out)            # (12, 12, 96)
        
        out = self.inception3(out)             # (12, 12, 96)
        out = self.bn3(out, training=training)
        out = self.pool2x2_in3(out)            # (6, 6, 96)
        
        out = self.inception4(out)             # (6, 6, 192)
        out = self.bn4(out, training=training)
        out = self.pool2x2_in4(out)            # (3, 3, 192)
        
        out = self.inception5(out)             # (3, 3, 192)
        out = self.bn5(out, training=training)
           
        out = self.avgpool(out)                # (1, 1, 192)
        return out
    
    def conv2d(self, size, filters, stride):
        return tf.keras.layers.Conv2D(
            filters = filters,
            kernel_size = [size, size],
            strides = (stride, stride),
            padding = 'same',
            activation = tf.nn.relu
        )
    
class GoogleNet_plus_ResNet(tf.keras.Model):
    def __init__(self):
        super(GoogleNet_plus_ResNet, self).__init__()
        
        self.googlenet = GoogleNet()
        self.resnet = ResNet(BasicBlock, [[16,3,1], [32,4,2], [64,4,2], [128,3,2]], use_bias=True)
        self.concatenate = tf.keras.layers.Concatenate()
        self.dense = tf.keras.layers.Dense(units=7, activation=tf.nn.softmax)
        
    def call(self, inputs, training=False):
        x1 = self.googlenet(inputs)
        x2 = self.resnet(inputs)
        out = self.concatenate([x1, x2])
        out = self.dense(out)
        return out

class FixedImageDataGenerator(tf.keras.preprocessing.image.ImageDataGenerator):
    def standardize(self, x):
        if self.featurewise_center:
            x = ((x/255.) - 0.5) * 2.
        return x
    
def train():
    # 0 anger 生气； 1 disgust 厌恶； 2 fear 恐惧； 3 happy 开心； 4 sad 伤心；5 surprised 惊讶； 6 normal 中性
    datagen = FixedImageDataGenerator(
        rescale=1./255,
        featurewise_center=True,  # 数据集去中心化
        rotation_range=10,        # 旋转角度范围
        width_shift_range=0.1,    # 水平位置平移
        height_shift_range=0.1,   # 上下位置平移
        #horizontal_flip=True,   # 随机水平翻转
        fill_mode='nearest'     # 填充模式
        )
    test_datagen =  FixedImageDataGenerator(rescale=1./255)
    
    train_generator = datagen.flow_from_directory(
        train_dir,  # this is the target directory
        target_size=(48, 48),  # all images will be resized to 48x48
        batch_size=batch_size,
        class_mode='sparse')  # since we use sparse_categorical_crossentropy loss, we need categorical labels
    test_generator = datagen.flow_from_directory(
        test_dir,
        target_size=(48, 48),
        batch_size=batch_size,
        class_mode='sparse')
    
    model.fit_generator(
         train_generator,
         steps_per_epoch=28709 / batch_size,
         epochs=num_epochs,
         verbose=1,
         validation_data=test_generator,
         callbacks=[cp_callback_mc, tensorboard_callback]
         )

def test():
    # 构建测试数据集
    test_datagen =  FixedImageDataGenerator(rescale=1./255)
    test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=(48, 48),
        batch_size=batch_size,
        class_mode='sparse')
    
    print(model.evaluate(test_generator))
    
if __name__ ==  '__main__':
    model = GoogleNet_plus_ResNet()
    model.build(input_shape=(None, 48, 48, 3))
    model.summary()
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
                    loss=tf.keras.losses.sparse_categorical_crossentropy,
                    metrics=[tf.keras.metrics.sparse_categorical_accuracy])
    
    cp_callback_mc = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                        save_weights_only=True,
                                                        verbose=0)
    latest = tf.train.latest_checkpoint(checkpoint_dir)
    if(latest != None):
        model.load_weights(latest).expect_partial()

    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
    tensorboard_callback.set_model(model)

Model: "google_net_plus__res_net"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
google_net (GoogleNet)       multiple                  124416    
_________________________________________________________________
res_net (ResNet)             multiple                  1191200   
_________________________________________________________________
concatenate_5 (Concatenate)  multiple                  0         
_________________________________________________________________
dense (Dense)                multiple                  2247      
Total params: 1,317,863
Trainable params: 1,312,615
Non-trainable params: 5,248
_________________________________________________________________


In [None]:
    train()

In [None]:
    test()