# 12.3 モデルと訓練アルゴリズムのカスタマイズ

### セットアップ

In [None]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version only exists in Colab.
    %tensorflow_version 2.x
except Exception:
    pass

# TensorFlow ≥2.4 is required in this notebook
# Earlier 2.x versions will mostly work the same, but with a few bugs
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.4"

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "deep"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

In [None]:
print('TensorFlow', tf.__version__)

## 12.3.1　Custom loss function（カスタム損失関数）

### 準備

まずはCalifornia housing datasetを読込んで準備しよう。<br>
読込んでからtrain, validation, testデータに分割し、スケーリングする。

In [None]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

### カスタム損失関数の定義（フーバー関数）

### フーバー関数

#### 定義
$$ L_{\delta}(a) = 
    \begin{cases}
        \frac{1}{2}a^{2} & |a| \leq \delta \\
        \delta|a| - \frac{1}{2}\delta^{2} & |a| > \delta \\
    \end{cases}
$$
※ $\delta$はしきい値、$a$は損失$y-f(x)$を想定。

#### 性質
* （平均二乗誤差関数より）ロバストかつスムーズな関数。以下の損失関数の欠点を補うために両者のいいとこ取りをしている。
    - 平均二乗誤差関数はスケールが2乗になるため外れ値に影響を受けやすい。
    - 平均絶対誤差関数は原点が微分不可能であり原点近くでも勾配が大きいまま。

スムーズであることは以下のように確認できる。<br>
$$ L_{\delta}^{'}(a) = 
    \begin{cases}
        a & |a| \leq \delta \\
        |\delta| & |a| > \delta \\
    \end{cases}
$$
なので$ L_{\delta}^{'}(a) = \delta$。よって$a = \delta$で微分可能。

$\delta = 1.0$の時のフーバー関数を描画してみよう。

In [None]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss  = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

In [None]:
def show_huber_fn(huber_fn):
    plt.figure(figsize=(8, 3.5))
    z = np.linspace(-4, 4, 200)
    plt.plot(z, huber_fn(0, z), "b-", linewidth=2, label="huber($z$)")
    plt.plot(z, z**2 / 2, "b:", linewidth=1, label=r"$\frac{1}{2}z^2$")
    plt.plot([-1, -1], [0, huber_fn(0., -1.)], "r--")
    plt.plot([1, 1], [0, huber_fn(0., 1.)], "r--")
    plt.gca().axhline(y=0, color='k')
    plt.gca().axvline(x=0, color='k')
    plt.axis([-4, 4, 0, 4])
    plt.grid(True)
    plt.xlabel("$z$")
    plt.legend(fontsize=14)
    plt.title("Huber loss", fontsize=14)
    plt.show()

show_huber_fn(huber_fn)

### カスタム損失関数の実装

フーバー関数を損失関数として使用してみよう。

まずはモデルを準備する。

In [None]:
input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

カスタムした損失関数もKeras APIの損失関数と同じようにコンパイル時のloss引数に関数オブジェクトを渡す。

In [None]:
model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"])

In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

In [None]:
model.evaluate(X_test, y_test)

以上がカスタム損失関数の使い方である。

## 12.3.2 カスタムオブジェクトを含むモデルのセーブ/ロード

まずはさきほど作成したモデルを保存しよう。保存時はこれまで変わりはない。

In [None]:
model.save("my_model_with_a_custom_loss.h5")

一方ロード時はカスタムオブジェクトの考慮が必要である。<br>
save時にlossなどに使用した関数名を保存するが、その関数のモジュールは保存しない。<br>
（実際my_model_with_a_custom_loss.h5をみるとlossの部分は`"loss": "huber_fn"`だけが保存されている。）

そこで、ロード時にはHDF5に保存した関数名と関数オブジェクトをマッピングした辞書を渡すことでloss関数を紐づける。

In [None]:
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                                custom_objects={"huber_fn": huber_fn})

In [None]:
# model.fit(X_train_scaled, y_train, epochs=2,
#           validation_data=(X_valid_scaled, y_valid))

In [None]:
model.evaluate(X_test, y_test)

では次にカスタムオブジェクトの設定（引数）ごと保存したい場合を紹介する。

まずはフーバー関数をインスタンスごとにしきい値を設定できるようにする。

In [None]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

これでしきい値を設定できるようになった。<br>
試しにしきい値を0.5, 1.0, 2.0に設定したフーバー関数のグラフを比較してみよう。

In [None]:
show_huber_fn(create_huber(0.5))
show_huber_fn(create_huber(1.0))
show_huber_fn(create_huber(2.0))

モデルへのloss関数の渡し方はさきほど同じ。

In [None]:
model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"])

In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

In [None]:
model.evaluate(X_test, y_test)

In [None]:
model.save("my_model_with_a_custom_loss_threshold_2.h5")

ロード時の注意としては、先述した通り"huber_fn"は学習時の関数オブジェクトではなく、ロード時にマッピングした関数オブジェクトであることに注意しよう。<br>
つまり**カスタムオブジェクトの設定は保存されない**。よって以下のように学習時と違う関数を紐づけることが出来てしまう。

In [None]:
model = keras.models.load_model("my_model_with_a_custom_loss_threshold_2.h5",
                                custom_objects={"huber_fn": create_huber(5.0)})

In [None]:
model.evaluate(X_test, y_test)

元の損失関数と異なるため、同じテストデータに対して精度評価を行った際にはロード後のモデルの方が損失が大きくなっている。

この場合はmodel.saveの外側で設定を管理しておく必要がある。<br>
（例えば素朴なアイデアとしては huber_fn = create_huber(2.0)などとして同じオブジェクトを使うようにする。しきい値を定数として定義しておくなど）

実はこの問題もカスタム損失関数クラスの実装を改良することで解消できる。

keras.losses.Loss クラスのサブクラスを作り、その get_config() メソッドを使用するようにする。

In [None]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):  # 特殊メソッド__call__じゃなくてよい？kerasの場合はcallを使うのか？
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
        
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

簡単にソースを解説しよう。

■ インスタンス生成時<br>
* 引数はthresholdとスーパークラスの引数であるreductionとnameをとる。reductionとnameはスーパークラスのコンストラクタに渡される。<br>
いずれもデフォルト値を持つので引数を渡さなくてもよい。<br>
    ⇒Lossクラスの説明：https://www.tensorflow.org/api_docs/python/tf/keras/losses/Loss

■ 学習時<br>
* callメソッドが呼ばれ、損失を計算して返す。（これまでの処理をこの中に書く）

■ モデル保存時<br>
* get_config()メソッドは個々のハイパーパラメータ名とその値をマッピングしたディクショナリを返す。
* kerasは損失関数のインスタンスのget_config()メソッドを呼出し、HDF5ファイルにハイパーパラメータと値のディクショナリをJSON形式で保存する。

■モデル読込み時<br>
* kerasは損失関数のインスタンスのfrom_config()メソッドを呼出し、HDF5ファイルに保存されたハイパーパラメータと値のディクショナリを読込んで設定ごと取り込む。
* from_config()メソッドはスーパークラスを使用するため、（個別処理が不要であれば）カスタム損失関数には実装不要である。<br>
    ⇒from_configの説明：https://www.tensorflow.org/api_docs/python/tf/keras/losses/Loss#from_config

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [None]:
model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"])

In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

In [None]:
model.evaluate(X_test, y_test)

In [None]:
model.save("my_model_with_a_custom_loss_class.h5")

In [None]:
model = keras.models.load_model("my_model_with_a_custom_loss_class.h5",
                                custom_objects={"HuberLoss": HuberLoss})

my_model_with_a_custom_loss_class.h5から該当箇所を抜き出すと以下のようになっている

```JSON
"loss": {   
            "class_name": "HuberLoss", 
            "config":   {   
                            "reduction": "auto", 
                            "name": null, 
                            "threshold": 2.0
                        }
        }
```

In [None]:
# model.fit(X_train_scaled, y_train, epochs=2,
#           validation_data=(X_valid_scaled, y_valid))

In [None]:
model.evaluate(X_test, y_test)

In [None]:
model.loss.threshold

## 12.3.3 カスタム活性化関数、初期化子、正則化器、制約

損失関数、正則化器、制約、初期化子、指標、活性化関数、レイヤはもとよりモデル全体といった Keras が提供している機能の大半は、同じような方法でカスタマイズできる。

多くの場合は適切な入出力をとる関数を定義するだけで済む。<br>
「適切な入出力」は公式ドキュメント参照：
* 損失関数：[tf.keras.losses.Loss](https://www.tensorflow.org/api_docs/python/tf/keras/losses/Loss)
* 正則化器：[tf.keras.regularizers.Regularizer](https://www.tensorflow.org/api_docs/python/tf/keras/regularizers/Regularizer)
* 制約：[tf.keras.constraints.Constraint](https://www.tensorflow.org/api_docs/python/tf/keras/constraints/Constraint)
* 初期化子：[tf.keras.initializers.Initializer](https://www.tensorflow.org/api_docs/python/tf/keras/initializers/Initializer)
* 指標：[tf.keras.metrics.Metric](https://www.tensorflow.org/api_docs/python/tf/keras/metrics/Metric)
* 活性化関数：ドキュメント見つからず（※単に前層からの出力を受取って後続層に渡す関数であればなんでもよいのかも？）
* レイヤ：[tf.keras.layers.Layer](https://www.tensorflow.org/api_docs/python/tf/keras/layers/Layer)
* モデル：[tf.keras.Model](https://www.tensorflow.org/api_docs/python/tf/keras/Model)

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
# カスタム活性化関数
def my_softplus(z): # return value is just tf.nn.softplus(z)
    return tf.math.log(tf.exp(z) + 1.0)

# カスタム重み初期化
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

# カスタム正則化
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

# カスタム制約
def my_positive_weights(weights): # return value is just tf.nn.relu(weights)
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [None]:
layer = keras.layers.Dense(1, activation=my_softplus,
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights)

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

上記4つのカスタムオブジェクトはレイヤに設定するものである。<br>
よって、レイヤ生成時に渡す。

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape, name="input"),
    keras.layers.Dense(1, activation=my_softplus,
                       kernel_regularizer=my_l1_regularizer,
                       kernel_constraint=my_positive_weights,
                       kernel_initializer=my_glorot_initializer,
                       name="hidden_1"),
])

In [None]:
model.compile(loss="mse", optimizer="nadam", metrics=["mae"])

In [None]:
model.summary()

In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

In [None]:
model.save("my_model_with_many_custom_parts.h5")

my_model_with_many_custom_parts.h5を開くと以下のようにhidden_1レイヤの設定が保存されていることが分かる。
```json
{
    "class_name":"Dense",
    "config":{
        "name":"hidden_1",
        "trainable":true,
        "dtype":"float32",
        "units":1,
        "activation":"my_softplus",
        "use_bias":true,
        "kernel_initializer":"my_glorot_initializer",
        "bias_initializer":{
            "class_name":"Zeros",
            "config":{
                
            }
        },
        "kernel_regularizer":"my_l1_regularizer",
        "bias_regularizer":null,
        "activity_regularizer":null,
        "kernel_constraint":"my_positive_weights",
        "bias_constraint":null
    }
}
```

ロードもさきほどと同様に関数名と関数オブジェクトをマッピングした辞書を渡す。

In [None]:
model = keras.models.load_model(
    "my_model_with_many_custom_parts.h5",
    custom_objects={
       "my_l1_regularizer": my_l1_regularizer,
       "my_positive_weights": my_positive_weights,
       "my_glorot_initializer": my_glorot_initializer,
       "my_softplus": my_softplus,
    })

my_l1_regularizerは正則化の"度合い"を決めるハイパーパラメータをもつ。<br>
これを変更可能にするには以下のようにクラス化する。

In [None]:
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor": self.factor}

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1, activation=my_softplus,
                       kernel_regularizer=MyL1Regularizer(0.01),
                       kernel_constraint=my_positive_weights,
                       kernel_initializer=my_glorot_initializer),
])

In [None]:
model.compile(loss="mse", optimizer="nadam", metrics=["mae"])

In [None]:
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

In [None]:
model.save("my_model_with_many_custom_parts.h5")

モデル読込み時の注意点はさきほどと同様なのでコメント省略

In [None]:
model = keras.models.load_model(
    "my_model_with_many_custom_parts.h5",
    custom_objects={
       "MyL1Regularizer": MyL1Regularizer,
       "my_positive_weights": my_positive_weights,
       "my_glorot_initializer": my_glorot_initializer,
       "my_softplus": my_softplus,
    })

In [None]:
model.evaluate(X_test, y_test)

## 12.3.4 カスタム指標

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [None]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

In [None]:
model.fit(X_train_scaled, y_train, epochs=2)

さきほど作成したhuber関数を評価指標に使用しよう。その際注意がある。

**■ポイント**<br>
lossとmetricで同じ関数を使用することが出来るが、異なる結果が出ることがある。原因は以下：
* 数式は同等でも演算順序が同じではないため浮動小数点の精度による誤差。
* 計算の中でサンプルの重みの使い方の相違。
    - loss : バッチ損失の算術平均
    - metric : インスタンス損失の加重平均

実装としてはmetricに指定すればよい。

In [None]:
model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[create_huber(2.0)])

In [None]:
sample_weight = np.random.rand(len(y_train))
history = model.fit(X_train_scaled, y_train, epochs=2, sample_weight=sample_weight)

ちなみに「損失 = メトリクス * サンプルの重みの平均 (および浮動小数点の精度誤差) 」を計算すると以下：

In [None]:
history.history["loss"][0], history.history["huber_fn"][0] * sample_weight.mean()

### ストリーミング指標

訓練中、kerasでは個々のバッチで指標を計算し、エポックの最初からのその指標の平均を管理する。<br>
しかし、以下に示す適合率（※）などは各バッチでの計算結果の算術平均をとるのではなく、要求されたタイミングでの最新の指標値を算出できる必要がある。<br>
この種の指標を**ストリーミング指標**という。

※正事例と予測したもののなかで真の値が正事例の割合を表す指標

In [None]:
precision = keras.metrics.Precision()
# 1バッチ目の想定
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])  # 真陽性率4/5=0.8

In [None]:
# 2バッチ目の想定
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])  # 真陽性率(4+0)/(5+3)=0.5

任意のタイミングでresultメソッドを呼ぶとその時点での指標値を返却する。

In [None]:
precision.result()

variables属性ではそのオブジェクトが管理している変数を返却する。

In [None]:
precision.variables

reset_statesで変数をリセットできる。

In [None]:
precision.reset_states()

再びvariables属性を呼び出すと値がリセットされていることが分かる。

In [None]:
precision.variables

カスタムストリーミング指標の作成

ポイントは以下：
* keras.metrics.Metricを継承する
* 指標を計算する上で必要になる中間データはadd_weightメソッドで初期化する。
    - 今回は各時点でのフーバー損失の合計とインスタンス数を保持する。
* update_state() メソッドは、このクラスのインスタンスを関数として使ったときに呼び出される。assign_addメソッドで各中間データを更新する。
* result()メソッドは最終結果を計算して返す。このクラスのインスタンスを関数として使ったとき、まず update_state() メソッドが呼び出されてから result() メソッドが呼び出され、その結果が返される。
*  モデルとともにthresholdが保存されるように、get_config()メソッドも実装している。
* reset_states()メソッドのデフォルト実装は、すべての変数を0.0にリセットする（必要ならオーバーライドできる）。

In [None]:
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs) # handles base args (e.g., dtype)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total / self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

まずは関数として使用してみる。

In [None]:
m = HuberMetric(2.)

# total = 2 * |10 - 2| - 2²/2 = 14
# count = 1
# result = 14 / 1 = 14
m(tf.constant([[2.]]), tf.constant([[10.]])) 

In [None]:
# total = total + (|1 - 0|² / 2) + (2 * |9.25 - 5| - 2² / 2) = 14 + 7 = 21
# count = count + 2 = 3
# result = total / count = 21 / 3 = 7
m(tf.constant([[0.], [5.]]), tf.constant([[1.], [9.25]]))

m.result()

インスタンスが管理する変数を表示すると、totalとcountが定義されていることが分かる。

In [None]:
m.variables

リセットすると値がゼロになる。

In [None]:
m.reset_states()
m.variables

`HuberMetric`クラスの動作確認をしよう。

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [None]:
model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=[HuberMetric(2.0)])

In [None]:
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)

In [None]:
model.save("my_model_with_a_custom_metric.h5")

In [None]:
model = keras.models.load_model("my_model_with_a_custom_metric.h5",
                                custom_objects={"huber_fn": create_huber(2.0),
                                                "HuberMetric": HuberMetric})

In [None]:
# model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)

In [None]:
model.evaluate(X_test, y_test)

`model.metrics[0]`には`HuberMetric`がいない場合があるもよう。<br>
どうやらlossもmetricとして追跡できるようになっているため、その関数が含まれているとのこと。<br>
よって`HuberMetric`にアクセスするには`model.metrics[-1]`とする。<br> 
(参照： [TF issue #38150](https://github.com/tensorflow/tensorflow/issues/38150))

In [None]:
model.metrics

In [None]:
model.metrics[-1].threshold

■HuberMetricクラスの実装簡略化

HuberMetricでは単に平均を更新するだけである。<br>
keras.metrics.Meanを継承することで計算処理部分をsuperクラスに任せることが出来、実装を簡略化できる。

In [None]:
class HuberMetric(keras.metrics.Mean):
    def __init__(self, threshold=1.0, name='HuberMetric', dtype=None):
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        super().__init__(name=name, dtype=dtype)
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        super(HuberMetric, self).update_state(metric, sample_weight)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}        

このクラスは形状をより適切に処理し、サンプルの重みもサポートする。

※sample weightについて<br>
引用元：[例に重みの付いたニューラルネットワークの訓練](https://reference.wolfram.com/language/tutorial/NeuralNetworksExampleWeighting.html.ja?source=footer)<br>
> ある例を正しく分類する方が，別の例を分類するよりも重要なことがある．不正検出に使われる二項分類器を考えてみよう．false positivesは無害であるがfalse negativesは壊滅的である．訓練中にこれを避ける方法の一つとして，負例よりも正例に大きい重みを置くというものがある．

In [None]:
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

In [None]:
model.compile(loss=keras.losses.Huber(2.0), optimizer="nadam", weighted_metrics=[HuberMetric(2.0)])

In [None]:
# sample_weightを設定
sample_weight = np.random.rand(len(y_train))  # U(0,1)からランダムに生成しているので多分特に意味は無い重み付け

history = model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32),
                    epochs=2, sample_weight=sample_weight)

In [None]:
history.history["loss"][0], history.history["HuberMetric"][0] * sample_weight.mean()

In [None]:
model.save("my_model_with_a_custom_metric_v2.h5")

In [None]:
model = keras.models.load_model("my_model_with_a_custom_metric_v2.h5",
                                custom_objects={"HuberMetric": HuberMetric})

In [None]:
model.fit(X_train_scaled.astype(np.float32), y_train.astype(np.float32), epochs=2)

In [None]:
model.metrics[-1].threshold