In [29]:
import tensorflow as tf
import numpy as np
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

Loss function

In [30]:
def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

# usage
# model.compile(loss=huber_fn, optimizer="nadam")

In [31]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 * 0.5
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

In [32]:
class HuberLoss(tf.keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__()
    
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 * 0.5
        return tf.where(is_small_error, squared_loss, linear_loss)
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

Activation function

In [33]:
def softplus(z):
    return tf.math.log(1.0 + tf.exp(z))

Initializer

In [34]:
def glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

Regularizer

In [35]:
def l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

Constraints

In [36]:
def positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

Sample usage

In [37]:
layer = tf.keras.layers.Dense(1,
                              activation=softplus,
                              kernel_initializer=glorot_initializer,
                              kernel_regularizer=l1_regularizer,
                              kernel_constraint=positive_weights)

Regularizer with custom factor as class

In [38]:
class L1Regularizer(tf.keras.regularizers.Regularizer):
    def __init__(self, factor) -> None:
        self.factor = factor

    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    
    def get_config(self):
        return {"factor": self.factor}

Estimator

In [39]:
class HuberMetric(tf.keras.metrics.Metric):
    def __init__(self, threshold, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        sample_metrics = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(sample_metrics))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):
        return self.total / self.count
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

Layer (no weights)

In [40]:
exponential_layer = tf.keras.layers.Lambda(lambda x: tf.exp(x))

Layer (with weights)

In [41]:
class CustomDense(tf.keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = activation
    
    def build(self, input_shape):
        self.kernel = self.add_weight(name="kernel", shape=[input_shape[-1], self.units], initializer='glorot_normal')
        self.bias = self.add_weight(name="bias", shape=[self.units], initializer='zeros')
    
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units, "activation": tf.keras.activations.serialize(self.activation)}


Model with residual block layer

In [42]:
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [tf.keras.layers.Dense(n_neurons, activation='relu', kernel_initializer='he_normal') for _ in range(n_layers)]

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [43]:
class ResidualRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hiddenl = tf.keras.layers.Dense(30, activation='relu', kernel_initializer='he_normal')
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = tf.keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hiddenl(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

Loss function counting other elements

In [44]:
class ReconstructingRegressor(tf.keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [tf.keras.layers.Dense(30, activation='relu', kernel_initializer='he_normal') for _ in range(5)]
        self.out = tf.keras.layers.Dense(output_dim)
        self.reconstruction_mean = tf.keras.metrics.Mean(name="reconstruction_error")

    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = tf.keras.layers.Dense(n_inputs)

    def call(self, inputs, training=False):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result)
        return self.out(Z)

Gradients calculation

In [45]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [46]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2]) # tape is deleted after use
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [47]:
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)

In [48]:
c1, c2 = tf.constant(5.), tf.constant(3.)

with tf.GradientTape() as tape:
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2]) # tape usually works only for variables
gradients

[None, None]

In [49]:
with tf.GradientTape() as tape:
    tape.watch(c1) # watched tensors can be processed
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [50]:
def f(w1, w2):
    return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2) # partially stopped back propagation

with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

Learning iterations

In [51]:
l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation='relu', kernel_initializer='he_normal', kernel_regularizer=l2_reg),
    tf.keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [52]:
# fetching random learning samples
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

# displaying learning progress
def print_status_bar(step, total, loss, metrics=None):
    metrics = " - ".join([f"{m.name}: {m.result():.4f}" for m in [loss] + (metrics or [])])
    end = "" if step < total else "\n"
    print(f"\r{step}/{total} - " + metrics, end=end)

In [53]:
# data
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

# hiperparameters
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_squared_error
mean_loss = tf.keras.metrics.Mean(name='mean_loss')
metrics = [tf.keras.metrics.MeanAbsoluteError()]

In [54]:
for epoch in range(1, n_epochs+1):
    print('Epoch {}/{}'.format(epoch, n_epochs))
    for step in range(1, n_steps+1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True) # using model to predict values
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred)) # prediction loss = mean of (mse for each sample)
            loss = tf.add_n([main_loss] + model.losses) # overall loss = sum of prediction loss and regularization loss

        gradients = tape.gradient(loss, model.trainable_variables) # loss function gradients for each modifiable variable
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)

        print_status_bar(step, n_steps, mean_loss, metrics)

    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch 1/5
362/362 - mean_loss: 3.0170 - mean_absolute_error: 0.6767
11610/11610 - mean_loss: 3.0170 - mean_absolute_error: 0.6767
Epoch 2/5
362/362 - mean_loss: 2.0820 - mean_absolute_error: 0.5647
11610/11610 - mean_loss: 2.0820 - mean_absolute_error: 0.5647
Epoch 3/5
362/362 - mean_loss: 1.3927 - mean_absolute_error: 0.5114
11610/11610 - mean_loss: 1.3927 - mean_absolute_error: 0.5114
Epoch 4/5
362/362 - mean_loss: 0.9670 - mean_absolute_error: 0.4914
11610/11610 - mean_loss: 0.9670 - mean_absolute_error: 0.4914
Epoch 5/5
362/362 - mean_loss: 0.7787 - mean_absolute_error: 0.4880
11610/11610 - mean_loss: 0.7787 - mean_absolute_error: 0.4880
