In [1]:
import tensorflow as tf
from IPython.display import clear_output
import os
# tf.multiply(2,3)
clear_output()

> ## Customizing Models and Training Algorithms

>> ### Custom Loss Functions

In [2]:
def huber_fn(y_true, y_pred):
    error = tf.math.subtract(y_true, y_pred)
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.math.square(error) / 2
    linear_loss = tf.math.abs(error) - 0.5

    return tf.where(is_small_error, squared_loss, linear_loss)

In [3]:
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30)
])
model.compile(loss=huber_fn, optimizer="nadam")
clear_output()

### Saving and Loading Models That Contain Custom Components

In [4]:
# model = tf.keras.models.save_model("my_model_with_a_custom_loss.h5",
#                                     custom_objects={"huber_fn": huber_fn})

> Create a subclass of the tf.keras.losses.Loss class, and then implementing its get_config() method to solve problem.

In [5]:
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        error = tf.math.subtract(y_true - y_pred)
        is_small_error = tf.math.abs(error) < 1
        squared_loss = tf.math.square(error) / 2
        linear_loss = tf.math.multiply(self.threshold, tf.abs(error)) - tf.math.square(self.threshold) / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [6]:
model.compile(loss=HuberLoss(2.), optimizer="nadam")
model.build(input_shape=([]))

In [7]:
model.save('my_model.h5')

In [8]:
# load_model = tf.keras.models.load_model('my_model.h5',
#                                         custom_objects={"HuberLoss": HuberLoss})

>> ### Custom Activation Functions, Initializers, Regularizers and Constraints

> If a function has hyperparameters that need to be saved along with the model, then you will want to subclass the appropriate.

In [9]:
class MyL1Regularizer(tf.keras.regularizers.Regularizer):
  def __init__(self, factor):
    self.factor = factor

  def __cal__(self, weights):
    return tf.reduce_sum(tf.abs(self.factor * weights))

  def get_config(self):
    return {"factor": self.factor}

### Custom Metrics

In [10]:
from tensorflow.python.ops.init_ops import Initializer
class HuberMetric(tf.keras.metrics.Metric):
  def __init__(self, threshold=1.0, **kwargs):
    super().__init__(**kwargs)   # Handle base arguments
    self.threshold = threshold
    self.huber_fn = self.create_huber(threshold)
    self.total = self.add_weight("total", initializer="zeros")
    self.count = self.add_weight("count", initializer="zeros")


  def update_state(self, y_true, y_pred, sample_weight=None):
    metric = self.huber_fn(y_true, y_pred)
    self.total.assign_add(tf.reduce_sum(metric))
    self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

  def result(self):
    return self.total / self.count

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

### Custom Layers

In [11]:
class MyDense(tf.keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = tf.keras.activations.get(activation)

    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
    
        self.bias = self.add_weight(
            name="bias", shape=[self.units],
            initializer="zeros")

        super().build(batch_input_shape)        # must be at the end

    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:1] + [self.units])

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 
                "units": self.units,
                "activation": tf.keras.activations.serialize(self.activation)}

### Custom Models

In [12]:
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.n_layers = n_layers
        self.n_neurons = n_neurons
        self.hiddden = [tf.keras.layers.Dense(self.n_neurons, 
                                            activation="elu",
                                            kernel_initializer="he_normal")
                        for _ in range(self.n_layers)]

        
        def call(self, inputs):
            x = inputs
            for layer in self.hidden:
                x = layer(x)
            return inputs + x

        def get_config(self):
            base_config = super().get_config()
            return {**base_config,
                    "n_layers": self.n_layers,
                    "n_neurons": self.n_neurons}

In [13]:
class ResidualRegressor(tf.keras.Model):
    def __init__(self, output_dim, units, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.output_dim = output_dim
        self.hidden1 = tf.keras.layers.Dense(units=self.units, activation="relu")
        self.residualblock1 = ResidualBlock(2, self.units)
        self.residualblock2 = ResidualBlock(2, self.units)
        self.hidden2 = tf.keras.layers.Dense(units=self.output_dim)

    def call(self, inputs):
        inputs = self.hidden1(inputs)
        for _ in range(1+3):
            inputs = self.residualblock1(inputs)
        inputs = self.residualblock2(inputs)
        inputs = self.hidden2(inputs)
        return inputs

    def get_config(self):
        base_config = super().get_config()
        return {**base_config,
                "units": self.units,
                "output_dim": self.output_dim}
        

### Computing Gradients Using Autodiff

In [14]:
def f(w1, w2):
    return tf.math.multiply(3, tf.math.square(w1)) + tf.math.multiply(2, w1 * w2)

In [15]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [16]:
# tape is automatically erased immediately after you call its gradient() method => get error when call gradient() twice:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
try:
    dz_dw2 = tape.gradient(z, w2)   
except:
    print("--------------- ERROR --------------")
    print("Use tape twice")

--------------- ERROR --------------
Use tape twice


In [17]:
# Solution: use persistent=True
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2) 
print(dz_dw1, dz_dw2)

tf.Tensor(36.0, shape=(), dtype=float32) tf.Tensor(10.0, shape=(), dtype=float32)


In [18]:
# Defaul, tape only track operations involving variables
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
print("---------- ERROR -------------")
print("Not Variable")
print(gradients)


---------- ERROR -------------
Not Variable
[None, None]


In [19]:
# Solution: Force tape track object
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
print(gradients)

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>, <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]


In [20]:
# stop gradient

def f(w1, w2):
    return tf.math.multiply(3, tf.math.square(w1)) + tf.stop_gradient(tf.math.multiply(2, w1 * w2))

with tf.GradientTape() as tape:
    z = f(w1, w2)

tape.gradient(z, [w1, w2])

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

### Custom Training Loops

In [21]:
# build simple model
l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30,
    activation='relu',
    kernel_initializer='he_normal',
    kernel_regularizer=l2_reg)
])

In [22]:
# random sample batch
def random_batch(X, y, batch_size=32):
    import numpy as np
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [23]:
# Function display the training status
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics, end=end)

In [24]:
# Define Hyper-paramater
X_train, y_train= tf.constant([1,2,3]), tf.constant([4,5,6])
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_absolute_error
mean_loss = tf.keras.metrics.Mean()
metrics = [tf.keras.metrics.MeanAbsoluteError()]

In [25]:
# Training loop
for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        # Calculate Gradients
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()


Epoch 1/5
3/3 - mean: 0.0000 - mean_absolute_error: 0.0000
Epoch 2/5
3/3 - mean: 0.0000 - mean_absolute_error: 0.0000
Epoch 3/5
3/3 - mean: 0.0000 - mean_absolute_error: 0.0000
Epoch 4/5
3/3 - mean: 0.0000 - mean_absolute_error: 0.0000
Epoch 5/5
3/3 - mean: 0.0000 - mean_absolute_error: 0.0000


In [26]:
y = tf.constant([1,2,3], dtype=tf.float32)
# calculate softmax
y_softmax = tf.nn.log_softmax(y)
y_softmax

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([-2.4076061 , -1.407606  , -0.40760604], dtype=float32)>

In [28]:
a = tf.nn.log_softmax(y_softmax)
a

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([-2.4076061 , -1.407606  , -0.40760604], dtype=float32)>

In [29]:
tf.nn.log_softmax(a)

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([-2.4076061 , -1.407606  , -0.40760604], dtype=float32)>

: 