# 编写自定义回调

### 介绍

回调是一种强大的工具，可以在训练，评估或预测期间自定义Keras模型的行为。示例包括通过TensorBoard的[`tf.keras.callbacks.TensorBoard`](https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/TensorBoard)可视化训练进度和结果，和[`tf.keras.callbacks.ModelCheckpoint`](https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/ModelCheckpoint)在训练期间定期保存模型。

在本指南中，你将了解Keras回调是什么，它可以做什么以及如何构建自己的回调。我们提供了一些简单的回调示例，以帮助你入门。

### 引入

In [None]:
import tensorflow as tf
from tensorflow import keras

### Keras回调概述

所有回调都是`keras.callbacks.Callback`类的子类，并覆盖在训练，评估和预测的各个阶段调用的一组方法。回调对于在训练期间了解模型的内部状态和统计信息很有用。

你可以将回调列表（使用关键字参数`callbacks`）传递给以下模型方法：

+ [`keras.Model.fit()`](https://tensorflow.google.cn/api_docs/python/tf/keras/Model#fit)
+ [`keras.Model.evaluate()`](https://tensorflow.google.cn/api_docs/python/tf/keras/Model#evaluate)
+ [`keras.Model.predict()`](https://tensorflow.google.cn/api_docs/python/tf/keras/Model#predict)

### 回调方法概述

#### 全局方法

`on_(train|test|predict)_begin(self, logs=None)`

在`fit`/`evaluate`/`predict`开始时调用。

`on_(train|test|predict)_end(self, logs=None)`
在`fit`/`evaluate`/`predict`结束时调用。

#### 用于训练/验证/预测的批处理方法

`on_(train|test|predict)_batch_begin(self, batch, logs=None) `

在训练/验证/预测期间，处理批次之前立即调用。

`on_(train|test|predict)_batch_end(self, batch, logs=None)`

在训练/验证/预测批次结束时调用，在此方法中， logs是包含指标结果的字典。

#### epoch级别的方法（仅训练）

`on_epoch_begin(self, epoch, logs=None)`

在训练期间的epoch开始时调用。

`on_epoch_end(self, epoch, logs=None)`

在训练期间的epoch末尾调用。

### 一个基本的示例

让我们看一个具体的例子，首先，让我们导入tensorflow并定义一个简单的Sequential模型：

In [None]:
# 定义Keras模型用于添加回调
def get_model():
    model = keras.Sequential()
    model.add(keras.layers.Dense(1, input_dim=784))
    model.compile(
        optimizer=keras.optimizers.RMSprop(learning_rate=0.1),
        loss="mean_squared_error",
        metrics=["mean_absolute_error"],
    )
    return model

然后，从Keras数据集API中加载MNIST数据用于训练和测试：

In [None]:
# 加载MNIST数据并进行预处理
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 784).astype("float32") / 255.0
x_test = x_test.reshape(-1, 784).astype("float32") / 255.0

# 仅使用1000个样本
x_train = x_train[:1000]
y_train = y_train[:1000]
x_test = x_test[:1000]
y_test = y_test[:1000]

现在，定义一个简单的自定义回调来记录：

+ `fit`/`evaluate`/`predict`何时开始和结束
+ 每个epoch何时开始和结束
+ 每批训练何时开始和结束
+ 每个评估（测试）批量何时开始和结束
+ 每个推断（预测）批量何时开始和结束

In [None]:
class CustomCallback(keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        # print(logs)
        # keys = list(logs.keys())
        print("Starting training; got log keys: {}".format(logs))

    def on_train_end(self, logs=None):
        # keys = list(logs.keys())
        print("Stop training; got log keys: {}".format(logs))

    def on_epoch_begin(self, epoch, logs=None):
        # keys = list(logs.keys())
        print("Start epoch {} of training; got log keys: {}".format(epoch, logs))

    def on_epoch_end(self, epoch, logs=None):
        # keys = list(logs.keys())
        print("End epoch {} of training; got log keys: {}".format(epoch, logs))

    def on_test_begin(self, logs=None):
        # keys = list(logs.keys())
        print("Start testing; got log keys: {}".format(logs))

    def on_test_end(self, logs=None):
        # keys = list(logs.keys())
        print("Stop testing; got log keys: {}".format(logs))

    def on_predict_begin(self, logs=None):
        # keys = list(logs.keys())
        print("Start predicting; got log keys: {}".format(logs))

    def on_predict_end(self, logs=None):
        # keys = list(logs.keys())
        print("Stop predicting; got log keys: {}".format(logs))

    def on_train_batch_begin(self, batch, logs=None):
        # keys = list(logs.keys())
        print("...Training: start of batch {}; got log keys: {}".format(batch, logs))

    def on_train_batch_end(self, batch, logs=None):
        # keys = list(logs.keys())
        print("...Training: end of batch {}; got log keys: {}".format(batch, logs))

    def on_test_batch_begin(self, batch, logs=None):
        # keys = list(logs.keys())
        print("...Evaluating: start of batch {}; got log keys: {}".format(batch, logs))

    def on_test_batch_end(self, batch, logs=None):
        # keys = list(logs.keys())
        print("...Evaluating: end of batch {}; got log keys: {}".format(batch, logs))

    def on_predict_batch_begin(self, batch, logs=None):
        # keys = list(logs.keys())
        print("...Predicting: start of batch {}; got log keys: {}".format(batch, logs))

    def on_predict_batch_end(self, batch, logs=None):
        # keys = list(logs.keys())
        print("...Predicting: end of batch {}; got log keys: {}".format(batch, logs))

让我们尝试运行一下：

In [None]:
model = get_model()
model.fit(
    x_train,
    y_train,
    batch_size=128,
    epochs=1,
    verbose=0,
    validation_split=0.5,
    callbacks=[CustomCallback()],
)

res = model.evaluate(
    x_test, y_test, batch_size=128, verbose=0, callbacks=[CustomCallback()]
)

res = model.predict(x_test, batch_size=128, callbacks=[CustomCallback()])

### logs字典的用法

`logs`字典包含损失值以及批量或epoch末尾的所有指标，示例包括损失和平均绝对误差。

In [None]:
class LossAndErrorPrintingCallback(keras.callbacks.Callback):
    def on_train_batch_end(self, batch, logs=None):
        print("For batch {}, loss is {:7.2f}.".format(batch, logs["loss"]))

    def on_test_batch_end(self, batch, logs=None):
        print("For batch {}, loss is {:7.2f}.".format(batch, logs["loss"]))

    def on_epoch_end(self, epoch, logs=None):
        print(
            "The average loss for epoch {} is {:7.2f} "
            "and mean absolute error is {:7.2f}.".format(
                epoch, logs["loss"], logs["mean_absolute_error"]
            )
        )


model = get_model()
model.fit(
    x_train,
    y_train,
    batch_size=128,
    epochs=2,
    verbose=0,
    callbacks=[LossAndErrorPrintingCallback()],
)

res = model.evaluate(
    x_test,
    y_test,
    batch_size=128,
    verbose=0,
    callbacks=[LossAndErrorPrintingCallback()],
)

### self.model属性的用法

除了在调用其中一种方法时接收日志信息外，回调还可以访问与当前一轮训练/评估/预测相关的模型： `self.model`。

以下是你可以在回调中使用`self.model`进行的一些操作：

+ 设置`self.model.stop_training = True`可以立即中断训练。
+ 修改优化器的超参数（当`self.model.optimizer`可用时），例如`self.model.optimizer.learning_rate`。
+ 定期保存模型。
+ 在每个epoch结束时，在一些测试样本上记录`model.predict()`的输出，用于在训练期间用作健壮性检查。
+ 在每个epoch结束时提取中间特征的可视化，以监视模型随时间推移的正在学习的内容。
+ 等等

让我们在几个示例了解上述操作。

### Keras回调应用示例

#### 在最小的损失下尽早停止

第一个示例展示了在达到最小损失时，如何通过`Callback`设置属性`self.model.stop_training`（boolean）来停止训练。（可选）你可以提供一个参数`patience`来指定在达到局部最小值后，我们应该在停止之前等待多少个epoch。

[`tf.keras.callbacks.EarlyStopping`](https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/EarlyStopping)提供了更完整和通用的实现。

In [None]:
import numpy as np


class EarlyStoppingAtMinLoss(keras.callbacks.Callback):
    """当损失达到最小时停止训练，即损失停止减少

  参数:
      patience: 达到最小后要等待的epoch数。在此数目没有改善后，训练将停止。
  """

    def __init__(self, patience=0):
        super(EarlyStoppingAtMinLoss, self).__init__()
        self.patience = patience
        # best_weights用于存储发生最小损失的权重
        self.best_weights = None

    def on_train_begin(self, logs=None):
        # 损失不再最小时已等待的epoch数
        self.wait = 0
        # 训练停止的epoch
        self.stopped_epoch = 0
        # 将最佳状态初始化为无穷大
        self.best = np.Inf

    def on_epoch_end(self, epoch, logs=None):
        current = logs.get("loss")
        if np.less(current, self.best):
            self.best = current
            self.wait = 0
            # 如果当前结果更好，则记录最佳权重
            self.best_weights = self.model.get_weights()
        else:
            self.wait += 1
            if self.wait >= self.patience:
                self.stopped_epoch = epoch
                self.model.stop_training = True
                print("Restoring model weights from the end of the best epoch.")
                self.model.set_weights(self.best_weights)

    def on_train_end(self, logs=None):
        if self.stopped_epoch > 0:
            print("Epoch %05d: early stopping" % (self.stopped_epoch + 1))


model = get_model()
model.fit(
    x_train,
    y_train,
    batch_size=64,
    steps_per_epoch=5,
    epochs=30,
    verbose=0,
    callbacks=[LossAndErrorPrintingCallback(), EarlyStoppingAtMinLoss()],
)

#### 学习率策略

在此示例中，我们展示了在训练过程中如何使用自定义的回调来动态更改优化器的学习率。

有关更一般的实现，请参见[`callbacks.LearningRateScheduler`](https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/LearningRateScheduler)。

In [None]:
class CustomLearningRateScheduler(keras.callbacks.Callback):
    """学习率调度器，根据策略设置学习率

  参数:
      策略: 该函数以epoch索引（整数，从0开始的
      索引）和当前学习率作为输入，并返回新的学习率作为输出（浮点数）
  """

    def __init__(self, schedule):
        super(CustomLearningRateScheduler, self).__init__()
        self.schedule = schedule

    def on_epoch_begin(self, epoch, logs=None):
        if not hasattr(self.model.optimizer, "lr"):
            raise ValueError('Optimizer must have a "lr" attribute.')
        # 从模型的优化器获取当前学习率
        lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))
        # 调用策略功能以获取计划的学习率
        scheduled_lr = self.schedule(epoch, lr)
        # 在此epoch开始之前，将值设置回优化器
        tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_lr)
        print("\nEpoch %05d: Learning rate is %6.4f." % (epoch, scheduled_lr))


LR_SCHEDULE = [
    # （开始epoch值，学习率）元组
    (3, 0.05),
    (6, 0.01),
    (9, 0.005),
    (12, 0.001),
]


def lr_schedule(epoch, lr):
    """辅助方法可根据epoch检索计划的学习率"""
    if epoch < LR_SCHEDULE[0][0] or epoch > LR_SCHEDULE[-1][0]:
        return lr
    for i in range(len(LR_SCHEDULE)):
        if epoch == LR_SCHEDULE[i][0]:
            return LR_SCHEDULE[i][1]
    return lr


model = get_model()
model.fit(
    x_train,
    y_train,
    batch_size=64,
    steps_per_epoch=5,
    epochs=15,
    verbose=0,
    callbacks=[
        LossAndErrorPrintingCallback(),
        CustomLearningRateScheduler(lr_schedule),
    ],
)

#### 内置Keras回调

请务必阅读[API文档](https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/)，以检查现有的Keras回调。应用包括记录到CSV，保存模型，在TensorBoard中可视化指标等等！