In [None]:
import tensorflow as tf
import numpy as np
from tensorflow import keras

In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
#总计20640个样本，每个样本8个属性表示，以及房价作为target，所有属性值均为number
#目标变量：平均房屋价值
#输入变量（特征）：平均收入、住房平均年龄、平均房间、平均卧室、人口、平均占用、纬度和经度

X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

In [None]:
input_shape = X_train.shape[1:]

In [None]:
#创建一个函数重新创建已配置的损失函数，阈值默认值为1.0
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

### Custom layers 自定义层

作用：构建包含独特层的架构、重复的架构（其中包含重复多次的相同层块，将每个层块视为一个层会很方便）

#### 创建不带任何权重的自定义层

#### 当预测值拥有非常不同的标度如0.01、10、1000时，可在回归模型的输出层使用指数层

keras.layers.core.Lambda(function, output_shape=None, mask=None, arguments=None)：可把任意的一个表达式作为一个“Layer”对象。

例如：#添加一个 x -> x^2 层
model.add(Lambda(lambda x: x ** 2))

function：要实现的函数，该函数仅接受一个变量，即神经网络上一层的输出

output_shape：函数应该返回的值的shape，可以是一个tuple或一个根据输入shape计算输出shape的函数  

mask: 掩膜    arguments：可选，是字典格式，用来传参

In [None]:
#创建一个函数并将其包装在keras.layers.Lambda中，使其成为一个层
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [None]:
exponential_layer([-1., 0., 1.])
#得到（exp（-1），exp（0），exp（1））

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.36787945, 1.        , 2.7182817 ], dtype=float32)>

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=input_shape),
    keras.layers.Dense(1),
    exponential_layer
])
model.compile(loss="mse", optimizer="sgd")
model.fit(X_train_scaled, y_train, epochs=5,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


0.3586341142654419

#### 构建具有权重的层（有状态层）

keras.activations.get（）：特定于tf.keras,将激活函数转换为适当的激活函数

在tf.keras中形状是tf.TensorShape类的实例，可以使用as_list()将其转换为Python列表

keras.activations.serialize()：保存激活函数的完整配置

In [None]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        """
        将所有超参数用作参数（units和activation）；
        units:MLP里面对应隐层的神经元个数
        **kwargs参数调用父类的构造函数并传递给kwargs，处理标准参数(如input_shape)；
        将超参数保存为属性，指定特定的激活函数
        """
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
        #self.activation = keras.layers.Activation(activation)
    def build(self, batch_input_shape):
        """
        为每个权重调用add_weight()来创建层的变量，首次使用该层时调用build（）方法
        """
        #创建权重连接矩阵kernel
        #知道上一层神经元的数量（输入的最后一个维度大小）
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
        #创建偏置项，将其初始化为0
        self.bias = self.add_weight(
            name="bias", shape=[self.units], initializer="zeros")
        #在最后：调用父类build（）方法，告诉keras这一层被构建了（设置self.build=True）
        super().build(batch_input_shape) 
    def call(self, X):
        """执行所需的操作，获得层的输出"""
        return self.activation(X @ self.kernel + self.bias)
    def compute_output_shape(self, batch_input_shape):
        """
        返回该层输出的形状（形状与输入形状相同，最后一个维度被替换为该层神经元数量）;
        一般可省略，因为tf.keras会自动推断输出形状，但当构建动态层时必须使用
        """
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    MyDense(30, activation="relu", input_shape=input_shape),
    MyDense(1)
])
model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.5473727583885193

In [None]:
model.save("my_model_with_a_custom_layer.h5")
model = keras.models.load_model("my_model_with_a_custom_layer.h5",
                                custom_objects={"MyDense": MyDense})

#### 创建一个具有多个输入（如Concatenate）的层

In [None]:
class MyMultiLayer(keras.layers.Layer):
    """
    只能使用函数式和子类API，顺序API仅接受具有一个输入和一个输出的层，
    对多个输入进行处理
    """
    def call(self, X):
        """参数包含所有输入的元组（2个），返回输出（2个）列表"""
        X1, X2 = X
        print("X1.shape: ", X1.shape ," X2.shape: ", X2.shape) # 方便debug
        return X1 + X2, X1 * X2

    def compute_output_shape(self, batch_input_shape):
        """参数包含每个输入的批处理形状的元组，返回批处理输出形状的列表"""
        batch_input_shape1, batch_input_shape2 = batch_input_shape
        return [batch_input_shape1, batch_input_shape2]

In [None]:
inputs1 = keras.layers.Input(shape=[2])
inputs2 = keras.layers.Input(shape=[2])
outputs1, outputs2 = MyMultiLayer()((inputs1, inputs2))
#batch size未指定，所以是None

X1.shape:  (None, 2)  X2.shape:  (None, 2)


In [None]:
def split_data(data):
    """把输入分成两半，每个输入有4个特征"""
    columns_count = data.shape[-1]
    half = columns_count // 2
    return data[:, :half], data[:, half:]

X_train_scaled_A, X_train_scaled_B = split_data(X_train_scaled)
X_valid_scaled_A, X_valid_scaled_B = split_data(X_valid_scaled)
X_test_scaled_A, X_test_scaled_B = split_data(X_test_scaled)

X_train_scaled_A.shape, X_train_scaled_B.shape

((11610, 4), (11610, 4))

In [None]:
outputs1, outputs2 = MyMultiLayer()((X_train_scaled_A, X_train_scaled_B))

X1.shape:  (11610, 4)  X2.shape:  (11610, 4)


#### 用Functional API构建完整的模型

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

input_A = keras.layers.Input(shape=X_train_scaled_A.shape[-1])
input_B = keras.layers.Input(shape=X_train_scaled_B.shape[-1])

hidden_A, hidden_B = MyMultiLayer()((input_A, input_B))#对两个输入进行处理

#处理后构成一个完整的隐藏层
hidden_A = keras.layers.Dense(30, activation='selu')(hidden_A)
hidden_B = keras.layers.Dense(30, activation='selu')(hidden_B)

concat = keras.layers.Concatenate()((hidden_A, hidden_B))
output = keras.layers.Dense(1)(concat)
model = keras.models.Model(inputs=[input_A, input_B], outputs=[output])

X1.shape:  (None, 4)  X2.shape:  (None, 4)


In [None]:
model.compile(loss='mse', optimizer='nadam')
model.fit((X_train_scaled_A, X_train_scaled_B), y_train, epochs=2,
          validation_data=((X_valid_scaled_A, X_valid_scaled_B), y_valid))
model.evaluate((X_test_scaled_A,X_test_scaled_B), y_test)

Epoch 1/2
X1.shape:  (None, 4)  X2.shape:  (None, 4)
X1.shape:  (None, 4)  X2.shape:  (None, 4)
Epoch 2/2


0.9515503644943237

#### 构建在训练期间（用于正则化）添加高斯噪声但在测试期间不执行任何操作的层

具有相同功能的层：keras.layers.GaussianNoise

In [None]:
class AddGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev#标准差

    def call(self, X, training=None):
        if training:
            #在训练时加入噪声
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    AddGaussianNoise(stddev=1.0),
    keras.layers.Dense(30, activation="selu"),
    keras.layers.Dense(1)
])
model.compile(loss="mse", optimizer="nadam")
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


0.7559615969657898