In [1]:
import numpy as np

import tensorflow as tf 
from tensorflow import keras 
from tensorflow.keras import layers

for gpu in tf.config.list_physical_devices('GPU'):
    tf.config.experimental.set_memory_growth(gpu, True)

In [2]:
# 通常，我们将批量归一化层置于全连接层中的仿射变换和激活函数之间。
# 对卷积层来说，批量归一化发生在卷积计算之后、应用激活函数之前。
# 如果卷积计算输出多个通道，我们需要对这些通道的输出分别做批量归一化，且每个通道都拥有独立的拉伸和偏移参数，并均为标量。

# 使用批量归一化训练时，我们可以将批量大小设得大一点，从而使批量内样本的均值和方差的计算都较为准确。
# 将训练好的模型用于预测时，单个样本的输出不应取决于批量归一化所需要的随机小批量中的均值和方差。
# 一种常用的方法是通过移动平均估算整个训练数据集的样本均值和方差，并在预测时使用它们得到确定的输出。


In [4]:
def batch_norm(is_training, X, gamma, beta, running_mean, running_var, eps, momentum):
    if not is_training:
        X_hat = (X - running_mean) / np.sqrt(running_var + eps)
    else:
        if len(X.shape) == 2:  # dense layer
            mean = np.mean(X, axis=0)
            var = np.mean((X - mean) ** 2, axis=0)
        else:  # conv
            mean = np.mean(X, axis=(0, 2, 3), keepdims=True)
            var = np.mean((X - mean) ** 2, axis=(0, 2, 3), keepdims=True)
        X_hat = (X - mean) / np.sqrt(var + eps)
        running_mean = momentum * running_mean + (1.0 - momentum) * mean
        running_var = momentum * running_var + (1.0 - momentum) * var
    Y = gamma * X_hat + beta
    return Y, running_mean, running_var

In [9]:
class BatchNormalization(layers.Layer):
    def __init__(self, decay=0.9, epsilon=1e-5, **kwargs):
        self.decay = decay
        self.epsilon = epsilon
        super().__init__(**kwargs)
        return 
    
    def build(self, input_shape):
        self.gamma = self.add_weight(
            name='gamma', 
            shape=[input_shape[-1]], 
            initializer=tf.initializers.ones, 
            trainable=True
        )
        self.beta = self.add_weight(
            name='beta', 
            shape=[input_shape[-1]], 
            initializer=tf.initializers.zeros, 
            trainable=True
        )
        self.running_mean = self.add_weight(
            name='running_mean', 
            shape=[input_shape[-1]], 
            initializer=tf.initializers.zeros, 
            trainable=False
        )
        self.running_var = self.add_weight(
            name='running_var', 
            shape=[input_shape[-1]], 
            initializer=tf.initializers.ones, 
            trainable=False
        )
        super().build(input_shape)
        return 
    
    def assign_running_mean(self, variable, value):
        delta = variable * self.decay + value * (1 - self.decay)
        return variable.assign(delta)
    
    @tf.function
    def call(self, inputs, training):
        if not training:
            mean = self.running_mean
            var = self.running_var
        else:
            batch_mean, batch_var = tf.nn.moments(inputs, list(range(len(inputs.shape) - 1)))
            mean_update = self.assign_running_mean(self.running_mean, batch_mean)
            var_udpate = self.assign_running_mean(self.running_var, batch_var)
            self.add_update(mean_update)
            self.add_update(var_udpate)
            mean = batch_mean
            var = batch_var
        output = tf.nn.batch_normalization(
            inputs, 
            mean=mean, 
            variance=var, 
            offset=self.beta, 
            scale=self.gamma, 
            variance_epsilon=self.epsilon
        )
        return output
    
    def compute_output_shape(self, input_shape):
        return input_shape
    

In [10]:
net = keras.Sequential([
    layers.Conv2D(6, kernel_size=5), 
    BatchNormalization(), 
    layers.Activation('sigmoid'), 
    layers.MaxPool2D(pool_size=2, strides=2), 
    
    layers.Conv2D(16, kernel_size=5), 
    BatchNormalization(), 
    layers.Activation('sigmoid'),
    layers.MaxPool2D(pool_size=2, strides=2), 
    
    layers.Flatten(), 
    
    layers.Dense(120), 
    BatchNormalization(), 
    layers.Activation('sigmoid'), 
    
    layers.Dense(84), 
    BatchNormalization(), 
    layers.Activation('sigmoid'),
    
    layers.Dense(10, activation='sigmoid'),
])

In [11]:
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape((60000, 28, 28, 1)).astype('float32') / 255
x_test = x_test.reshape((10000, 28, 28, 1)).astype('float32') / 255

net.compile(loss='sparse_categorical_crossentropy',
              optimizer=tf.keras.optimizers.RMSprop(),
              metrics=['accuracy'])
history = net.fit(x_train, y_train,
                    batch_size=64,
                    epochs=5,
                    validation_split=0.2)
test_scores = net.evaluate(x_test, y_test, verbose=2)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])


Train on 48000 samples, validate on 12000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
10000/10000 - 1s - loss: 0.1524 - accuracy: 0.9511
Test loss: 0.15243182272315026
Test accuracy: 0.9511


In [12]:
net.get_layer(index=1).gamma,net.get_layer(index=1).beta

(<tf.Variable 'sequential_1/batch_normalization_4/gamma:0' shape=(6,) dtype=float32, numpy=
 array([1.0482427 , 1.0853697 , 1.1533467 , 0.89600337, 1.1308107 ,
        0.9885612 ], dtype=float32)>,
 <tf.Variable 'sequential_1/batch_normalization_4/beta:0' shape=(6,) dtype=float32, numpy=
 array([-0.49997556,  0.38060945, -0.79884404, -0.21281163,  0.23447165,
        -0.02724187], dtype=float32)>)

In [13]:
net = keras.Sequential([
    layers.Conv2D(6, kernel_size=5), 
    layers.BatchNormalization(), 
    layers.Activation('sigmoid'), 
    layers.MaxPool2D(pool_size=2, strides=2), 
    
    layers.Conv2D(16, kernel_size=5), 
    layers.BatchNormalization(), 
    layers.Activation('sigmoid'),
    layers.MaxPool2D(pool_size=2, strides=2), 
    
    layers.Flatten(), 
    
    layers.Dense(120), 
    layers.BatchNormalization(), 
    layers.Activation('sigmoid'), 
    
    layers.Dense(84), 
    layers.BatchNormalization(), 
    layers.Activation('sigmoid'),
    
    layers.Dense(10, activation='sigmoid'),
])

In [14]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape((60000, 28, 28, 1)).astype('float32') / 255
x_test = x_test.reshape((10000, 28, 28, 1)).astype('float32') / 255

net.compile(loss='sparse_categorical_crossentropy',
              optimizer=tf.keras.optimizers.RMSprop(),
              metrics=['accuracy'])
history = net.fit(x_train, y_train,
                    batch_size=64,
                    epochs=5,
                    validation_split=0.2)
test_scores = net.evaluate(x_test, y_test, verbose=2)
print('Test loss:', test_scores[0])
print('Test accuracy:', test_scores[1])


Train on 48000 samples, validate on 12000 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
10000/10000 - 1s - loss: 0.1705 - accuracy: 0.9503
Test loss: 0.17049654459506272
Test accuracy: 0.9503
