In [None]:
#自定义损失函数
import tensorflow as tf
from tensorflow import keras
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)
model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn": huber_fn})
model.compile(loss=huber_fn, optimizer="nadam")
model.fit(X_train, y_train, [...])

def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold ** 2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn
model.compile(loss=create_huber(2.0), optimizer="nadam")

model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",custom_objects={"huber_fn": create_huber(2.0)})

class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__( **kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold ** 2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return { **base_config, "threshold": self.threshold}

#构造函数接受**kwargs并将它们传递给父类构造函数，该父类构造函数处理标准超参数：损失的name和用于聚合单个实例损失的reduction算法。
#call（）方法获取标签和预测，计算所有实例损失，然后将其返回
#·get_conig（）方法返回一个字典，将每个超参数名称映射到其值。它首先调用父类的get_conig（）方法，然后将新的超参数添加到此字典中

model.compile(loss=HuberLoss(2.), optimizer="nadam")
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",custom_objects={"HuberLoss": HuberLoss})

In [None]:
#自定义激活函数、初始化、正则化和约束
def my_softplus(z): # return value is just tf.nn.softplus(z)
    return tf.math.log(tf.exp(z) + 1.0)
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights): # return value is just tf.nn.relu(weights)
    return tf.where(weights < 0., tf.zeros_like(weights), weights)
layer = keras.layers.Dense(30, activation=my_softplus, kernel_initializer=my_glorot_initializer,kernel_regularizer=my_l1_regularizer,kernel_constraint=my_positive_weights)

class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}

model.compile(loss="mse", optimizer="nadam",metrics=[create_huber(2.0)])

class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0,**kwargs):
        super().__init__( kwargs) # handles base args (e.g., dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return { **base_config, "threshold": self.threshold}

In [None]:
#自定义层
#以下层将对它的输入应用指数函数
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

#以下类实现了Dense层的简化版本
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__( **kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(name="kernel", shape=[batch_input_shape[-1], self.units], initializer="glorot_normal")
        self.bias = self.add_weight(name="bias", shape=[self.units], initializer="zeros")
        super().build(batch_input_shape) # must be at the end
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    def get_config(self):
        base_config = super().get_config()
        return { **base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}

class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return [X1 + X2, X1 * X2, X1 / X2]
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1] # should probably handle broadcasting rules

class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__( **kwargs)
        self.stddev = stddev
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__( **kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                          kernel_initializer="he_normal")
                       for _ in range(n_layers)]
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [None]:
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__( **kwargs)
        self.hidden1 = keras.layers.Dense(30,activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__( **kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal")for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        return self.out(Z)

In [1]:
#autodiff 自动微分计算梯度

def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2
#通过在调整相应参数时测量函数输出的变化来计算每个偏导数的近似值
w1, w2, eps = 5, 3, 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps, (f(w1, w2 + eps) - f(w1, w2)) / eps

(36.000003007075065, 10.000000003174137)

In [3]:
#使用TensorFlow自动微分
import tensorflow as tf
w1, w2 = tf.Variable(5.), tf.Variable(3.)
#创建一个t.GradientTape上下文，该上下文将自动记录涉及变量的每个操作
with tf.GradientTape() as tape:
    z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])

#调用tape的gradient（）方法后，tape会立即被自动擦除，因此如果你尝试两次调用gradient（），则会出现异常

#如果你需要多次调用gradient()，则必须使该tape有持久性并在每次使用完该tape后将其删除以释放资源
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1) # => tensor 36.0
dz_dw2 = tape.gradient(z, w2) # => tensor 10.0, works fine now
del tape

c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2]) # returns [None, None]
#可以强制tape观察任何张量，来记录涉及它们的所有操作
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2]) # returns [tensor 36., tensor 10.]





[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [None]:
def f(w1, w2):
    return 3*w1**2+tf.stop_gradient(2*w1*w2)
with tf.GradientTape() as tape:
    z=f(w1, w2)  # same result as without stop_gradient()
gradients=tape.gradient(z, [w1, w2]) # returns [tensor 30., None]
#使用自动微分计算此函数的梯度会导致一些数值上的困难：由于浮点精度误差，自动微分最终导致计算无穷除以无穷（返回NaN）

#我们可以告诉TensorFlow在计算my_so tplus()函数的梯度时使用@t.custom_gradient来修饰它并使它既返回其正常输出又返回计算导数的函数（注意，它将接收到目前为止反向传播的梯度，直到sotplus函数。据链式规则，我们应该将它们乘以该函数的梯度）
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    return tf.math.log(exp + 1), my_softplus_gradients

In [4]:
#自定义训练循环
import numpy as np
from tensorflow import keras
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                         for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics,
          end=end)




In [None]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()
#在上述代码中，我们创建了两个嵌套循环：一个用于轮次，另一个用于轮次内的批处理，然后从训练集中抽取一个随机批次。在t.GradientTape（）块中，我们对一个批次进行了预测（使用模型作为函数），并计算了损失。接下来，我们要求tape针对每个可训练变量（不是所有变量！）计算损失的梯度，然后用优化器来执行“梯度下降”步骤。然后，我们更新平均损失和指标（在当前轮次内），并显示状态栏。在每个轮次结束时，我们再次显示状态栏以使其看起来完整并打印换行符，然后重置平均损失和指标的状态。
for variable in model.variables:
    if variable.constraint is not None:
        variable.assign(variable.constraint(variable))
