In [1]:
import tensorflow as tf

In [2]:
tensor = tf.constant([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
tensor

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[1, 2, 3],
       [4, 5, 6],
       [7, 8, 9]])>

In [3]:
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
tensor.shape

TensorShape([3, 3])

In [5]:
tensor.dtype

tf.int32

In [6]:
tensor[:, :]

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[1, 2, 3],
       [4, 5, 6],
       [7, 8, 9]])>

In [7]:
tensor[..., 0]
# Elipsis is useful in order for one to reach certain dimension.

<tf.Tensor: shape=(3,), dtype=int32, numpy=array([1, 4, 7])>

In [8]:
tensor + 10

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[11, 12, 13],
       [14, 15, 16],
       [17, 18, 19]])>

In [9]:
tf.square(tensor)

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[ 1,  4,  9],
       [16, 25, 36],
       [49, 64, 81]])>

In [10]:
tf.transpose(tensor)

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[1, 4, 7],
       [2, 5, 8],
       [3, 6, 9]])>

In [11]:
tensor@tensor

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[ 30,  36,  42],
       [ 66,  81,  96],
       [102, 126, 150]])>

In [12]:
tf.tensordot(tensor, tensor, axes=1)

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[ 30,  36,  42],
       [ 66,  81,  96],
       [102, 126, 150]])>

In [13]:
tf.matmul(tensor, tensor)

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[ 30,  36,  42],
       [ 66,  81,  96],
       [102, 126, 150]])>

In [14]:
tf.reduce_sum(tensor)

<tf.Tensor: shape=(), dtype=int32, numpy=45>

There is also a way to use keras in low-level context:

In [15]:
from tensorflow import keras
K = keras.backend

K.square(K.transpose(tensor)) + 10

<tf.Tensor: shape=(3, 3), dtype=int32, numpy=
array([[11, 26, 59],
       [14, 35, 74],
       [19, 46, 91]])>

Tensorflow works well with NumPy

In [16]:
import numpy as np

a = np.array([4, 5, 6])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=int32, numpy=array([4, 5, 6])>

In [17]:
tensor.numpy()

array([[1, 2, 3],
       [4, 5, 6],
       [7, 8, 9]])

In [18]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=int32, numpy=array([16, 25, 36])>

In [19]:
np.square(tensor)

array([[ 1,  4,  9],
       [16, 25, 36],
       [49, 64, 81]])

Type conversions can hurt performance in machine learning significantly, that is why in tensorflow they are not performed automatically.


Also, as the name suggests, tf.constant is constant, hence cannot be changed. Here comes with help tf.Variable

In [20]:
var = tf.Variable([[4, 5], [6, 7], [8, 9]])
var

<tf.Variable 'Variable:0' shape=(3, 2) dtype=int32, numpy=
array([[4, 5],
       [6, 7],
       [8, 9]])>

One can change the value of the variable using assign() method.

In [21]:
var.assign(2 * var)

<tf.Variable 'UnreadVariable' shape=(3, 2) dtype=int32, numpy=
array([[ 8, 10],
       [12, 14],
       [16, 18]])>

In [22]:
var[1, 0].assign(11)

<tf.Variable 'UnreadVariable' shape=(3, 2) dtype=int32, numpy=
array([[ 8, 10],
       [11, 14],
       [16, 18]])>

In [23]:
var[0, :].assign([1, 2])

<tf.Variable 'UnreadVariable' shape=(3, 2) dtype=int32, numpy=
array([[ 1,  2],
       [11, 14],
       [16, 18]])>

In [24]:
var.scatter_nd_update(indices=[[0, 1], [1, 0], [1, 1]], updates=[48, 49, 50])

<tf.Variable 'UnreadVariable' shape=(3, 2) dtype=int32, numpy=
array([[ 1, 48],
       [49, 50],
       [16, 18]])>

# Custom models and Training Algorithms
Let's begin from custom loss function on housing dataset.

In [25]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()

X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

In [26]:
# Let's implement Heuber loss
def heuber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

In [27]:
input_shape = X_train.shape[1:]

model = keras.models.Sequential()
model.add(keras.layers.Dense(units=30, activation='selu', input_shape=input_shape))
model.add(keras.layers.Dense(units=1))

In [28]:
model.compile(loss=heuber_fn, optimizer='nadam')
model.fit(X_train_scaled, y_train, validation_data=(X_valid_scaled, y_valid), epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x29eead7eb90>

When it comes to saving custom model, it saves no problem, however on loading we need to provide dictionary to the function existing in code.

In [29]:
model.save("models/ch12_custom_model.h5")

  saving_api.save_model(


In [30]:
model = keras.models.load_model("models/ch12_custom_model.h5", custom_objects={"heuber_fn": heuber_fn})

Here are examples of some custom functions like custom activation function, custom initializer, custom regularizer and custom constraint.

In [31]:
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)

In [32]:
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / float(shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

In [33]:
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

In [34]:
def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [35]:
model_2 = keras.models.Sequential([
    keras.layers.Dense(30, activation=my_softplus,
                       kernel_initializer=my_glorot_initializer,
                       kernel_regularizer=my_l1_regularizer,
                       kernel_constraint=my_positive_weights),
    keras.layers.Dense(units=1)
])

In [36]:
model_2.compile(loss=heuber_fn, optimizer='nadam')
model_2.fit(X_train_scaled, y_train, validation_data=(X_valid_scaled, y_valid), epochs=10)

Epoch 1/10


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x29eec271210>

Well, the validation loss is worse than before but that's to be expected from your functions.

Now let's try writing a custom metric. Metrics are not the same as losses, because losses are for Gradient Descent for training a model, but metrics are meant for human interaction.

In order to keep in track the overall accuracy, we can use Precision object.

In [37]:
precision = keras.metrics.Precision()
precision([1, 1, 0, 1, 0, 0, 1], [1, 1, 0, 0, 1, 1, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [38]:
precision([1, 0, 0, 0, 1, 0, 1], [1, 0, 0, 1, 1, 0, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.625>

In [39]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.625>

In [40]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([5.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([3.], dtype=float32)>]

We can create a subclass of Metric.

In [41]:
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

In [42]:
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros")
        self.count = self.add_weight("count", initializer="zeros")
    
    def update_state(self, y_true, y_pred, sample_wights):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
        
    def result(self):
        return self.total/self.count
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

Now let's implement custom layer behaviour. If some layers are repetitively used, we can merge them into one and call just that.

In [43]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = activation
        
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(
            name="kernel", shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal"
        )
        self.bias = self.add_weight(
            name="bias", shape=self.units, initializer="zeros"
        )
        super().build(batch_input_shape)
        
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])

If we want to implement custom behaviour that is different in training and testing we can do it like below:

In [44]:
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super.__init__(**kwargs)
        self.stddev = stddev
        
    def call(self, X, training = None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X+noise
        else:
            return X
        
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

## Custom model
We can implement custom model behaviour, even the absurd ones.

In [45]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super.__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu", kernel_initializer="he_normal") for _ in range(n_layers)]

    def call(self, inputs):
        z = inputs
        for layer in self.hidden:
            Z = layer(z)
        return inputs + Z

This is the block creates multiple of same blocks. Now we can build a special model itself.

In [46]:
class ResidualRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super.__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
        
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(inputs):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)
        

## Autodiff in tensorflow
Now let's promptly experiment with autodiff in tensorflow. Let's define a function to differenciate.

In [47]:
def f(w1, w2):
    return 3*w1**2 + 2*w1*w2

In [48]:
w1, w2 = 5, 3
eps = 1e-6
(f(w1+eps, w2) - f(w1, w2))/eps

36.000003007075065

In [49]:
(f(w1, w2+eps) - f(w1, w2))/eps

10.000000003174137

This approach lets us compute a derivative in certain points, in this case (5, 3). Another approach: Autodiff.

In [51]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)
    
gradients = tape.gradient(z, [w1, w2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

It checks out. It is crucial though to do the GradientTape in with block in order to save memory. Gradient tape records operations for autodiff.

In [55]:
# You cannot call gradient twice.
try:
    tape.gradient(z, w1)
except Exception as e:
    print(e)

A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)


It is possible to compute Hessians. To do that, simply put another GradientTape inside of GradientTape.

In [56]:
x = tf.Variable([100.])
with tf.GradientTape() as tape:
    z = my_softplus(x)
    
tape.gradient(z, [x])

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>]

When computing autodiff of our custom softplus the algorithm stumbles upon an error due to the precision points. We can somewhat override it using a decorator.

In [58]:
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    
    return tf.math.log(exp+1), my_softplus_gradients

## Creating custom training loops
If you need extra flexibility that .fit() does not provide, than we can modify that sequence to our needs.  
Let's build a simple model

In [59]:
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [60]:
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

In [61]:
def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result()) for m in [loss] + (metrics or [])]) 
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics, end=end)

We have foundations for our custom fit sequence, let's get down to the bussines.

In [63]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01) 
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [66]:
# First loop for epochs
for epoch in range(1, n_epochs +1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    # Second loop for batches within epoch
    for step in range(1, n_steps + 1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        
        # Making predictions for batch and calculating mean loss in batch
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        
        # Computing and applying gradient
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        # Update the mean loss in current epoch
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        
        # Display status bar after each batch
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
        
    # Display status bar after each epoch
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    
    # Reset states of mean loss 
    for metric in [mean_loss] + metrics:
        metric.reset_states()
    
        

Epoch 1/5
11610/11610 - mean: 1.6789 - mean_absolute_error: 0.6110
Epoch 2/5
11610/11610 - mean: 0.6805 - mean_absolute_error: 0.5183
Epoch 3/5
11610/11610 - mean: 0.6601 - mean_absolute_error: 0.5236
Epoch 4/5
11610/11610 - mean: 0.6407 - mean_absolute_error: 0.5199
Epoch 5/5
11610/11610 - mean: 0.6651 - mean_absolute_error: 0.5303


## Tensorflow graphs
Graphs in tensorflow allow for better python function computation in tensorflow style. Nowadays it is easy to use and often beneficial over vanilla python.

In [67]:
def fourth_power(x):
    return x**4

In [68]:
fourth_power(2)

16

In [69]:
tf_fourth_power = tf.function(fourth_power)
tf_fourth_power

<tensorflow.python.eager.polymorphic_function.polymorphic_function.Function at 0x29efa8c56d0>

In [70]:
tf_fourth_power(2)

<tf.Tensor: shape=(), dtype=int32, numpy=16>

As an alternative we could use a decorator that could do this directly

In [72]:
@tf.function
def tf_fourth_power_1(x):
    return x ** 4

tf_fourth_power_1(2)

<tf.Tensor: shape=(), dtype=int32, numpy=16>

In [73]:
tf_fourth_power_1.python_function(2)

16

In [76]:
tf_fourth_power_1(tf.constant([2, 3]))

<tf.Tensor: shape=(2,), dtype=int32, numpy=array([16, 81])>