## § 12.2.	Using TensorFlow like NumPy

### § 12.2.1.	Tensors and Operations

In [1]:
import tensorflow as tf

In [2]:
t = tf.constant([[1., 2., 3.], [4., 5., 6.]]) # matrix
t

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
t.shape

TensorShape([2, 3])

In [4]:
t.dtype

tf.float32

In [5]:
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

In [6]:
t[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
       [5.]], dtype=float32)>

In [7]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [8]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [9]:
t @ tf.transpose(t)

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

In [10]:
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [11]:
# added ex-post: Keras's low-level API
K = tf.keras.backend
K.square(K.transpose(t)) + 10

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
       [14., 35.],
       [19., 46.]], dtype=float32)>

In [12]:
# ditto
tf.square(tf.transpose(t)) + 10

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
       [14., 35.],
       [19., 46.]], dtype=float32)>

### § 12.2.2.	Tensors and NumPy

In [13]:
import numpy as np

In [14]:
a = np.array([2., 4., 5.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 4., 5.])>

In [15]:
t.numpy() # or np.array(t)

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [16]:
np.array(t) # added ex-post

array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

In [17]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>

In [18]:
np.square(t)

array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)

### § 12.2.3.	Type Conversions

In [20]:
tf.constant(2.) + tf.constant(40.)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

In [25]:
# tf.constant(2.) + tf.constant(40., dtype=tf.float64)

In [26]:
# added ex-post
try:
  tf.constant(2.0) + tf.constant(40)
except tf.errors.InvalidArgumentError as ex:
  print(ex)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2] name: 


In [27]:
# added ex-post
try:
  tf.constant(2.0) + tf.constant(40., dtype=tf.float64)
except tf.errors.InvalidArgumentError as ex:
  print(ex)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2] name: 


In [28]:
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

### § 12.2.4.	Variables

In [29]:
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [30]:
v.assign(2 * v) # v is now [[2., 4., 6.], [8., 10., 12.]]

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [31]:
v[0, 1].assign(42) # v is now [[2., 42., 6.], [8., 10., 12.]]

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [32]:
v[:, 2].assign([0., 1.]) # v is now [[2., 42., 0.], [8., 10., 1.]]

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [33]:
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.]) # v is now [[100., 42., 0.], [8., 10., 200.]]

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

In [34]:
# added ex-post: shows how to use scatter_update()
sparse_delta = tf.IndexedSlices(values=[[1., 2., 3.], [4., 5., 6.]],
                                indices=[1, 0])
v.scatter_update(sparse_delta)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[4., 5., 6.],
       [1., 2., 3.]], dtype=float32)>

In [39]:
# v[1] = [7., 8., 9.]

In [40]:
# added ex-post
try:
  v[1] = [7., 8., 9.]
except TypeError as ex:
  print(ex)

'ResourceVariable' object does not support item assignment


### § 12.2.5.	Other Data Structures

In [41]:
# added ex_post: string array
tf.constant(b"hello world")

<tf.Tensor: shape=(), dtype=string, numpy=b'hello world'>

In [42]:
tf.constant("café")

<tf.Tensor: shape=(), dtype=string, numpy=b'caf\xc3\xa9'>

In [43]:
u = tf.constant([ord(c) for c in "café"])
u

<tf.Tensor: shape=(4,), dtype=int32, numpy=array([ 99,  97, 102, 233], dtype=int32)>

In [44]:
b = tf.strings.unicode_encode(u, "UTF-8")
b

<tf.Tensor: shape=(), dtype=string, numpy=b'caf\xc3\xa9'>

In [45]:
tf.strings.length(b, unit="UTF8_CHAR")

<tf.Tensor: shape=(), dtype=int32, numpy=4>

In [46]:
tf.strings.unicode_decode(b, "UTF-8")

<tf.Tensor: shape=(4,), dtype=int32, numpy=array([ 99,  97, 102, 233], dtype=int32)>

In [47]:
p = tf.constant(["Café", "Coffee", "caffè", "咖啡"])

In [48]:
tf.strings.length(p, unit="UTF8_CHAR")

<tf.Tensor: shape=(4,), dtype=int32, numpy=array([4, 6, 5, 2], dtype=int32)>

In [49]:
r = tf.strings.unicode_decode(p, "UTF8")
r

<tf.RaggedTensor [[67, 97, 102, 233], [67, 111, 102, 102, 101, 101],
 [99, 97, 102, 102, 232], [21654, 21857]]>

In [50]:
# added ex-post: ragged tensors
r[1]

<tf.Tensor: shape=(6,), dtype=int32, numpy=array([ 67, 111, 102, 102, 101, 101], dtype=int32)>

In [51]:
r[1:3]  # added ex-post: a slice of a ragged tensor is a ragged tensor

<tf.RaggedTensor [[67, 111, 102, 102, 101, 101], [99, 97, 102, 102, 232]]>

In [52]:
r2 = tf.ragged.constant([[65, 66], [], [67]])
tf.concat([r, r2], axis=0)

<tf.RaggedTensor [[67, 97, 102, 233], [67, 111, 102, 102, 101, 101],
 [99, 97, 102, 102, 232], [21654, 21857], [65, 66], [], [67]]>

In [53]:
r3 = tf.ragged.constant([[68, 69, 70], [71], [], [72, 73]])
print(tf.concat([r, r3], axis=1))

<tf.RaggedTensor [[67, 97, 102, 233, 68, 69, 70], [67, 111, 102, 102, 101, 101, 71],
 [99, 97, 102, 102, 232], [21654, 21857, 72, 73]]>


In [54]:
r.to_tensor()

<tf.Tensor: shape=(4, 6), dtype=int32, numpy=
array([[   67,    97,   102,   233,     0,     0],
       [   67,   111,   102,   102,   101,   101],
       [   99,    97,   102,   102,   232,     0],
       [21654, 21857,     0,     0,     0,     0]], dtype=int32)>

In [55]:
# added ex-post: sparse tensors
s = tf.SparseTensor(indices=[[0, 1], [1, 0], [2, 3]],
                    values=[1., 2., 3.],
                    dense_shape=[3, 4])

In [56]:
tf.sparse.to_dense(s)

<tf.Tensor: shape=(3, 4), dtype=float32, numpy=
array([[0., 1., 0., 0.],
       [2., 0., 0., 0.],
       [0., 0., 0., 3.]], dtype=float32)>

In [57]:
s * 42.0

SparseTensor(indices=tf.Tensor(
[[0 1]
 [1 0]
 [2 3]], shape=(3, 2), dtype=int64), values=tf.Tensor([ 42.  84. 126.], shape=(3,), dtype=float32), dense_shape=tf.Tensor([3 4], shape=(2,), dtype=int64))

In [58]:
try:
    s + 42.0
except TypeError as ex:
    print(ex)

unsupported operand type(s) for +: 'SparseTensor' and 'float'


In [59]:
# added ex-post: how to multiply a sparse tensor and a dense tensor
s4 = tf.constant([[10., 20.], [30., 40.], [50., 60.], [70., 80.]])
tf.sparse.sparse_dense_matmul(s, s4)

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[ 30.,  40.],
       [ 20.,  40.],
       [210., 240.]], dtype=float32)>

In [60]:
# added ex-post: when creating a sparse tensor, values must be given in "reading order", or else `to_dense()` will fail
s5 = tf.SparseTensor(indices=[[0, 2], [0, 1]],  # WRONG ORDER!
                     values=[1., 2.],
                     dense_shape=[3, 4])
try:
    tf.sparse.to_dense(s5)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

{{function_node __wrapped__SparseToDense_device_/job:localhost/replica:0/task:0/device:GPU:0}} indices[1] is out of order. Many sparse ops require sorted indices.
  Use `tf.sparse.reorder` to create a correctly ordered copy.


	 [[{{node SparseToDense}}]] [Op:SparseToDense] name: 


In [61]:
# added ex-post: how to fix the sparse tensor s5 by reordering its values
s6 = tf.sparse.reorder(s5)
tf.sparse.to_dense(s6)

<tf.Tensor: shape=(3, 4), dtype=float32, numpy=
array([[0., 2., 1., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]], dtype=float32)>

In [62]:
# added ex-post: tensor arrays
array = tf.TensorArray(dtype=tf.float32, size=3)
array = array.write(0, tf.constant([1., 2.]))
array = array.write(1, tf.constant([3., 10.]))
array = array.write(2, tf.constant([5., 7.]))
tensor1 = array.read(1)  # returns (and zeros out!) tf.constant([3., 10.])

In [63]:
array.stack()

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[1., 2.],
       [0., 0.],
       [5., 7.]], dtype=float32)>

In [64]:
# added ex-post: disable clear_after_read
array2 = tf.TensorArray(dtype=tf.float32, size=3, clear_after_read=False)
array2 = array2.write(0, tf.constant([1., 2.]))
array2 = array2.write(1, tf.constant([3., 10.]))
array2 = array2.write(2, tf.constant([5., 7.]))
tensor2 = array2.read(1)  # returns tf.constant([3., 10.])
array2.stack()

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[ 1.,  2.],
       [ 3., 10.],
       [ 5.,  7.]], dtype=float32)>

In [65]:
# added ex-post: create and use a tensor array with a dynamic size
array3 = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
array3 = array3.write(0, tf.constant([1., 2.]))
array3 = array3.write(1, tf.constant([3., 10.]))
array3 = array3.write(2, tf.constant([5., 7.]))
tensor3 = array3.read(1)
array3.stack()

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[1., 2.],
       [0., 0.],
       [5., 7.]], dtype=float32)>

In [66]:
# added ex-post: sets
a = tf.constant([[1, 5, 9]])
b = tf.constant([[5, 6, 9, 11]])
u = tf.sets.union(a, b)
u

SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]], shape=(5, 2), dtype=int64), values=tf.Tensor([ 1  5  6  9 11], shape=(5,), dtype=int32), dense_shape=tf.Tensor([1 5], shape=(2,), dtype=int64))

In [67]:
tf.sparse.to_dense(u)

<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[ 1,  5,  6,  9, 11]], dtype=int32)>

In [68]:
a = tf.constant([[1, 5, 9], [10, 0, 0]])
b = tf.constant([[5, 6, 9, 11], [13, 0, 0, 0]])
u = tf.sets.union(a, b)
tf.sparse.to_dense(u)

<tf.Tensor: shape=(2, 5), dtype=int32, numpy=
array([[ 1,  5,  6,  9, 11],
       [ 0, 10, 13,  0,  0]], dtype=int32)>

In [69]:
# added ex-post: use a different default value, -1 in this case
a = tf.constant([[1, 5, 9], [10, -1, -1]])
b = tf.constant([[5, 6, 9, 11], [13, -1, -1, -1]])
u = tf.sets.union(a, b)
tf.sparse.to_dense(u, default_value=-1)

<tf.Tensor: shape=(2, 5), dtype=int32, numpy=
array([[ 1,  5,  6,  9, 11],
       [-1, 10, 13, -1, -1]], dtype=int32)>

In [70]:
# added ex-post: use `tf.sets.difference()`
set1 = tf.constant([[2, 3, 5, 7], [7, 9, 0, 0]])
set2 = tf.constant([[4, 5, 6], [9, 10, 0]])
tf.sparse.to_dense(tf.sets.difference(set1, set2))

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[2, 3, 7],
       [7, 0, 0]], dtype=int32)>

In [71]:
# added ex-post: use `tf.sets.difference()`
tf.sparse.to_dense(tf.sets.intersection(set1, set2))

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[5, 0],
       [0, 9]], dtype=int32)>

In [72]:
# added ex-post: check whether set1[0] contains 5
tf.sets.size(tf.sets.intersection(set1[:1], tf.constant([[5, 0, 0, 0]]))) > 0

<tf.Tensor: shape=(1,), dtype=bool, numpy=array([ True])>

In [73]:
# added ex-post: queues
q = tf.queue.FIFOQueue(3, [tf.int32, tf.string], shapes=[(), ()])
q.enqueue([10, b"windy"])
q.enqueue([15, b"sunny"])
q.size()

<tf.Tensor: shape=(), dtype=int32, numpy=2>

In [74]:
q.dequeue()

[<tf.Tensor: shape=(), dtype=int32, numpy=10>,
 <tf.Tensor: shape=(), dtype=string, numpy=b'windy'>]

In [75]:
q.enqueue_many([[13, 16], [b'cloudy', b'rainy']])

In [76]:
q.dequeue_many(3)

[<tf.Tensor: shape=(3,), dtype=int32, numpy=array([15, 13, 16], dtype=int32)>,
 <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'sunny', b'cloudy', b'rainy'], dtype=object)>]

## § 12.3.	Customizing Models and Training Algorithms

## § 12.3.1.	Custom Loss Functions

In [77]:
def huber_fn(y_true, y_pred):
  error = y_true - y_pred
  is_small_error = tf.abs(error) < 1
  squared_loss = tf.square(error) / 2
  linear_loss = tf.abs(error) - 0.5
  return tf.where(is_small_error, squared_loss, linear_loss)

In [78]:
# added ex-post: loads, splits and scales the California housing dataset,
# then creates a simple Keras model
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [79]:
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42
)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

input_shape = X_train.shape[1:]

tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal", input_shape=input_shape),
    tf.keras.layers.Dense(1)
])

In [80]:
model.compile(loss=huber_fn,
              optimizer="nadam",
              metrics=["mae"]) # added ex-post
model.fit(X_train, y_train,
          # [...]
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2cc92b7700>

### § 12.3.2.	Saving and Loading Models That Contain Custom Components

In [81]:
model.save("my_model_with_a_custom_loss") # added ex-post

In [82]:
model = tf.keras.models.load_model("my_model_with_a_custom_loss",
                                   custom_objects={"huber_fn": huber_fn})

In [83]:
model.fit(X_train_scaled, y_train, # added ex-post
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2ccbe6f520>

In [84]:
def create_huber(threshold=1.0):
  def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < threshold
    squared_loss = tf.square(error) / 2
    linear_loss = threshold * tf.abs(error) - threshold ** 2 / 2
    return tf.where(is_small_error, squared_loss, linear_loss)
  return huber_fn

model.compile(loss=create_huber(2.0),
              optimizer="nadam",
              metrics=["mae"])

In [85]:
model.fit(X_train_scaled, y_train, # added ex-post
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2ccb428280>

In [86]:
model.save("my_model_with_a_custom_loss_threshold_2") # added ex-post

In [87]:
model = tf.keras.models.load_model(
    "my_model_with_a_custom_loss_threshold_2",
    custom_objects={"huber_fn": create_huber(2.0)}
)

In [88]:
model.fit(X_train_scaled, y_train, # added ex-post
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2ccb26a320>

In [89]:
class HuberLoss(tf.keras.losses.Loss):
  def __init__(self, threshold=1.0, **kwargs):
    self.threshold = threshold
    super().__init__(**kwargs)

  def call(self, y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < self.threshold
    squared_loss = tf.square(error) / 2
    linear_loss = self.threshold * tf.abs(error) - self.threshold ** 2 / 2
    return tf.where(is_small_error, squared_loss, linear_loss)

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

In [90]:
# added ex-post: create another basic Keras model
tf.keras.utils.set_random_seed(42)

model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal", input_shape=input_shape),
    tf.keras.layers.Dense(1)
])

In [91]:
model.compile(loss=HuberLoss(2.),
              optimizer="nadam",
              metrics=["mae"])

In [92]:
model.fit(X_train_scaled, y_train, # added ex-post
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2ccb09c700>

In [93]:
model.save("my_model_with_a_custom_loss_class") # added ex-post

In [94]:
model = tf.keras.models.load_model("my_model_with_a_custom_loss_class",
                                   custom_objects={"HuberLoss": HuberLoss})

In [95]:
# added ex-post: checks that loading worked fine, the model can be used
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2ccbb8c6a0>

In [96]:
model.loss.threshold # added ex-post: the threshold was loaded correctly

2.0

### § 12.3.3.	Custom Activation Functions, Initializers, Regularizers, and Constraints

In [97]:
def my_softplus(z):
  return tf.math.log(1.0 + tf.exp(z))

def my_glorot_initializer(shape, dtype=tf.float32):
  stddev = tf.sqrt(2. / (shape[0] + shape[1]))
  return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
  return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights): # return value is just tf.nn.relu(weights)
  return tf.where(weights < 0., tf.zeros_like(weights), weights)

In [98]:
layer = tf.keras.layers.Dense(1, activation=my_softplus,
                              kernel_initializer=my_glorot_initializer,
                              kernel_regularizer=my_l1_regularizer,
                              kernel_constraint=my_positive_weights)

In [99]:
# added ex-post: check that building, training, saving, loading, and
# training again works fine w/ a model containing many custom parts
tf.keras.utils.set_random_seed(42)

model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal", input_shape=input_shape),
    tf.keras.layers.Dense(1, activation=my_softplus,
                          kernel_initializer=my_glorot_initializer,
                          kernel_regularizer=my_l1_regularizer,
                          kernel_constraint=my_positive_weights)
])

model.compile(loss="mse",
              optimizer="nadam",
              metrics=["mae"])
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.save("my_model_with_many_custom_parts")
model = tf.keras.models.load_model(
    "my_model_with_many_custom_parts",
    custom_objects={"my_l1_regularizer": my_l1_regularizer,
                    "my_positive_weights": my_positive_weights,
                    "my_glorot_initializer": my_glorot_initializer,
                    "my_softplus": my_softplus}
)
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2




Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2ccbef40d0>

In [100]:
class MyL1Regularizer(tf.keras.regularizers.Regularizer):
  def __init__(self, factor):
    self.factor = factor

  def __call__(self, weights):
    return tf.reduce_sum(tf.abs(self.factor * weights))

  def get_config(self):
    return {"factor": self.factor}

In [101]:
# added ex-post: again, check that it works, using our custom regularizer class
tf.keras.utils.set_random_seed(42)

model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal", input_shape=input_shape),
    tf.keras.layers.Dense(1, activation=my_softplus,
                          kernel_regularizer=MyL1Regularizer(0.01),
                          kernel_constraint=my_positive_weights, kernel_initializer=my_glorot_initializer)
])

model.compile(loss="mse",
              optimizer="nadam",
              metrics=["mae"])
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.save("my_model_with_many_custom_parts")
model = tf.keras.models.load_model(
    "my_model_with_many_custom_parts",
    custom_objects={"MyL1Regularizer": MyL1Regularizer,
                    "my_positive_weights": my_positive_weights,
                    "my_glorot_initializer": my_glorot_initializer,
                    "my_softplus": my_softplus}
)
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2




Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2cc86077c0>

### § 12.3.4.	Custom Metrics

In [102]:
# added ex-post: create a basic Keras model
tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal",
                          input_shape=input_shape),
    tf.keras.layers.Dense(1)
])

In [103]:
model.compile(loss="mse",
              optimizer="nadam",
              metrics=[create_huber(2.0)])

In [104]:
# added ex-post: train the model w/ custom metric
model.fit(X_train_scaled, y_train,
          epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2cc83f9d20>

In [105]:
precision = tf.keras.metrics.Precision() # streaming metrics
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [106]:
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [107]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [108]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [109]:
precision.reset_states() # both variables get reset to 0.0

In [110]:
class HuberMetric(tf.keras.metrics.Metric):
  def __init__(self, threshold=1.0, **kwargs):
    super().__init__(**kwargs) # handles base args (e.g., dtype)
    self.threshold = threshold
    self.huber_fn = create_huber(threshold)
    self.total = self.add_weight("total", initializer="zeros")
    self.count = self.add_weight("count", initializer="zeros")

  def update_state(self, y_true, y_pred, sample_weight=None):
    sample_metrics = self.huber_fn(y_true, y_pred)
    self.total.assign_add(tf.reduce_sum(sample_metrics))
    self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

  def result(self):
    return self.total / self.count

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

In [111]:
# added ex-post: test the HuberMetric class and show another implementation subclassing tf.keras.metrics.Mean
m = HuberMetric(2.)

# total = 2 * |10 - 2| - 2²/2 = 14
# count = 1
# result = 14 / 1 = 14
m(tf.constant([[2.]]), tf.constant([[10.]]))

<tf.Tensor: shape=(), dtype=float32, numpy=14.0>

In [112]:
# total = total + (|1 - 0|² / 2) + (2 * |9.25 - 5| - 2² / 2) = 14 + 7 = 21
# count = count + 2 = 3
# result = total / count = 21 / 3 = 7
m(tf.constant([[0.], [5.]]), tf.constant([[1.], [9.25]]))

<tf.Tensor: shape=(), dtype=float32, numpy=7.0>

In [113]:
m.result()

<tf.Tensor: shape=(), dtype=float32, numpy=7.0>

In [114]:
m.variables

[<tf.Variable 'total:0' shape=() dtype=float32, numpy=21.0>,
 <tf.Variable 'count:0' shape=() dtype=float32, numpy=3.0>]

In [115]:
m.reset_states()
m.variables

[<tf.Variable 'total:0' shape=() dtype=float32, numpy=0.0>,
 <tf.Variable 'count:0' shape=() dtype=float32, numpy=0.0>]

In [116]:
tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal",
                          input_shape=input_shape),
    tf.keras.layers.Dense(1),
])

In [117]:
model.compile(loss=create_huber(2.0),
              optimizer="nadam",
              metrics=[HuberMetric(2.0)])

In [118]:
model.fit(X_train_scaled, y_train,
          epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2cc842c8e0>

In [119]:
model.save("my_model_with_a_custom_metric")

In [120]:
model = tf.keras.models.load_model(
    "my_model_with_a_custom_metric",
    custom_objects={"huber_fn": create_huber(2.0),
                    "HuberMetric": HuberMetric}
)

In [121]:
model.fit(X_train_scaled, y_train,
          epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2cc8b74820>

In [122]:
model.metrics[-1].threshold

2.0

In [123]:
class HuberMetric(tf.keras.metrics.Mean):
  def __init__(self, threshold=1.0, name='HuberMetric', dtype=None):
    self.threshold = threshold
    self.huber_fn = create_huber(threshold)
    super().__init__(name=name, dtype=dtype)

  def update_state(self, y_true, y_pred, sample_weight=None):
    metric = self.huber_fn(y_true, y_pred)
    super(HuberMetric, self).update_state(metric, sample_weight)

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "threshold": self.threshold}

In [125]:
tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal",
                          input_shape=input_shape),
    tf.keras.layers.Dense(1)
])

In [126]:
model.compile(loss=tf.keras.losses.Huber(2.0),
              optimizer="nadam",
              weighted_metrics=[HuberMetric(2.0)])

In [127]:
np.random.seed(42)
sample_weight = np.random.rand(len(y_train))
history = model.fit(X_train_scaled, y_train,
                    epochs=2,
                    sample_weight=sample_weight)

Epoch 1/2
Epoch 2/2


In [128]:
(history.history["loss"][0],
 history.history["HuberMetric"][0] * sample_weight.mean())

(0.32688796520233154, 0.32688802680969753)

In [129]:
model.save("my_model_with_a_custom_metric_v2")

In [130]:
model = tf.keras.models.load_model("my_model_with_a_custom_metric_v2",
                                   custom_objects={"HuberMetric": HuberMetric})

In [131]:
model.fit(X_train_scaled, y_train,
          epochs=2)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2cc7a713c0>

In [132]:
model.metrics[-1].threshold

2.0

### § 12.3.5.	Custom Layers

In [133]:
exponential_layer = tf.keras.layers.Lambda(lambda x: tf.exp(x))

In [134]:
# added ex-post: like all layers, it can be used as a function:
exponential_layer([-1., 0., 1.])

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([0.36787948, 1.        , 2.7182817 ], dtype=float32)>

In [135]:
# added ex-post
tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu", input_shape=input_shape),
    tf.keras.layers.Dense(1),
    exponential_layer
])
model.compile(loss="mse",
              optimizer="sgd")
model.fit(X_train_scaled, y_train,
          epochs=5,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


0.3782574236392975

In [138]:
class MyDense(tf.keras.layers.Layer):
  def __init__(self, units, activation=None, **kwargs):
    super().__init__(**kwargs)
    self.units = units
    self.activation = tf.keras.activations.get(activation)

  def build(self, batch_input_shape):
    self.kernel = self.add_weight(
        name="kernel", shape=[batch_input_shape[-1], self.units],
        initializer="glorot_normal"
    )
    self.bias = self.add_weight(
        name="bias", shape=[self.units], initializer="zeros"
    )

  def call(self, X):
    return self.activation(X @ self.kernel + self.bias)

  def get_config(self):
    base_config = super().get_config()
    return {**base_config, "units": self.units,
            "activation": tf.keras.activations.serialize(self.activation)}

In [139]:
# extra code: shows that a custom layer can be used normally
tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([
    MyDense(30, activation="relu", input_shape=input_shape),
    MyDense(1)
])
model.compile(loss="mse",
              optimizer="nadam")
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)
model.save("my_model_with_a_custom_layer")

Epoch 1/2
Epoch 2/2


In [140]:
# added ex-post: hows how to load a model with a custom layer
model = tf.keras.models.load_model("my_model_with_a_custom_layer",
                                   custom_objects={"MyDense": MyDense})
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7a2cc80d2f20>

In [141]:
class MyMultiLayer(tf.keras.layers.Layer):
  def call(self, X):
    X1, X2 = X
    print("X1.shape: ", X1.shape ," X2.shape: ", X2.shape)  # added ex-post
    return X1 + X2, X1 * X2, X1 / X2

In [142]:
# added ex-post: tests MyMultiLayer with symbolic inputs
inputs1 = tf.keras.layers.Input(shape=[2])
inputs2 = tf.keras.layers.Input(shape=[2])
MyMultiLayer()((inputs1, inputs2))

X1.shape:  (None, 2)  X2.shape:  (None, 2)


(<KerasTensor: shape=(None, 2) dtype=float32 (created by layer 'my_multi_layer')>,
 <KerasTensor: shape=(None, 2) dtype=float32 (created by layer 'my_multi_layer')>,
 <KerasTensor: shape=(None, 2) dtype=float32 (created by layer 'my_multi_layer')>)

In [143]:
# added ex-post: tests MyMultiLayer with actual data
X1, X2 = np.array([[3., 6.], [2., 7.]]), np.array([[6., 12.], [4., 3.]])
MyMultiLayer()((X1, X2))

X1.shape:  (2, 2)  X2.shape:  (2, 2)


(<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
 array([[ 9., 18.],
        [ 6., 10.]], dtype=float32)>,
 <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
 array([[18., 72.],
        [ 8., 21.]], dtype=float32)>,
 <tf.Tensor: shape=(2, 2), dtype=float32, numpy=
 array([[0.5      , 0.5      ],
        [0.5      , 2.3333333]], dtype=float32)>)

In [144]:
class MyGaussianNoise(tf.keras.layers.Layer):
  def __init__(self, stddev, **kwargs):
    super().__init__(**kwargs)
    self.stddev = stddev

  def call(self, X, training=False):
    if training:
      noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
      return X + noise
    else:
      return X

In [145]:
# added ex-post: tests MyGaussianNoise
tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([
    MyGaussianNoise(stddev=1.0, input_shape=input_shape),
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal"),
    tf.keras.layers.Dense(1)
])
model.compile(loss="mse",
              optimizer="nadam")
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Epoch 2/2


1.105394959449768

### § 12.3.6.	Custom Models

In [146]:
class ResidualBlock(tf.keras.layers.Layer):
  def __init__(self, n_layers, n_neurons, **kwargs):
    super().__init__(**kwargs)
    self.hidden = [tf.keras.layers.Dense(n_neurons, activation="relu",
                                         kernel_initializer="he_normal")
                   for _ in range(n_layers)]

  def call(self, inputs):
    Z = inputs
    for layer in self.hidden:
      Z = layer(Z)
    return inputs + Z

In [147]:
class ResidualRegressor(tf.keras.Model):
  def __init__(self, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.hidden1 = tf.keras.layers.Dense(30, activation="relu",
                                         kernel_initializer="he_normal")
    self.block1 = ResidualBlock(2, 30)
    self.block2 = ResidualBlock(2, 30)
    self.out = tf.keras.layers.Dense(output_dim)

  def call(self, inputs):
    Z = self.hidden1(inputs)
    for _ in range(1 + 3):
      Z = self.block1(Z)
    Z = self.block2(Z)
    return self.out(Z)

In [148]:
# added ex-post: shows that the model can be used normally
tf.keras.utils.set_random_seed(42)
model = ResidualRegressor(1)
model.compile(loss="mse",
              optimizer="nadam")
history = model.fit(X_train_scaled, y_train,
                    epochs=2)
score = model.evaluate(X_test_scaled, y_test)
model.save("my_custom_model")

Epoch 1/2
Epoch 2/2


In [149]:
# added ex-post: the model can be loaded, continue training or use it to make predictions
model = tf.keras.models.load_model("my_custom_model")
history = model.fit(X_train_scaled, y_train,
                    epochs=2)
model.predict(X_test_scaled[:3])

Epoch 1/2
Epoch 2/2


array([[1.192895],
       [1.31425 ],
       [4.357673]], dtype=float32)

In [150]:
# ditto
tf.keras.utils.set_random_seed(42)
block1 = ResidualBlock(2, 30)
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal"),
    block1, block1, block1, block1,
    ResidualBlock(2, 30),
    tf.keras.layers.Dense(1)
])

### § 12.3.7.	Losses and Metrics Based on Model Internals

In [151]:
class ReconstructingRegressor(tf.keras.Model):
  def __init__(self, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.hidden = [tf.keras.layers.Dense(30, activation="relu",
                                         kernel_initializer="he_normal")
                   for _ in range(5)]
    self.out = tf.keras.layers.Dense(output_dim)
    self.reconstruction_mean = tf.keras.metrics.Mean(
        name="reconstruction_error"
    )

  def build(self, batch_input_shape):
    n_inputs = batch_input_shape[-1]
    self.reconstruct = tf.keras.layers.Dense(n_inputs)

  def call(self, inputs, training=False):
    Z = inputs
    for layer in self.hidden:
      Z = layer(Z)
    reconstruction = self.reconstruct(Z)
    recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
    self.add_loss(0.05 * recon_loss)
    if training:
      result = self.reconstruction_mean(recon_loss)
      self.add_metric(result)
    return self.out(Z)

In [152]:
# added ex-post
tf.keras.utils.set_random_seed(42)
model = ReconstructingRegressor(1)
model.compile(loss="mse",
              optimizer="nadam")
history = model.fit(X_train_scaled, y_train,
                    epochs=5)
y_pred = model.predict(X_test_scaled)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


### § 12.3.8.	Computing Gradients Using Autodiff

In [153]:
def f(w1, w2):
  return 3 * w1 ** 2 + 2 * w1 * w2

In [154]:
w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps

36.000003007075065

In [155]:
(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [157]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
  z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])

In [158]:
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [159]:
with tf.GradientTape() as tape:
  z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1) # returns tensor 36.0
try:
  dz_dw2 = tape.gradient(z, w2) # raises a RuntimeError!
except RuntimeError as ex:
  print(ex)

A non-persistent GradientTape can only be used to compute one set of gradients (or jacobians)


In [161]:
with tf.GradientTape(persistent=True) as tape:
  z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1) # returns tensor 36.0
dz_dw2 = tape.gradient(z, w2) # returns tensor 10.0, works fine now!
del tape

In [162]:
dz_dw1, dz_dw2 # added ex-post

(<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>)

In [163]:
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
  z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2]) # returns [None, None]

In [164]:
gradients # added ex-post

[None, None]

In [165]:
with tf.GradientTape() as tape:
  tape.watch(c1)
  tape.watch(c2)
  z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2]) # returns [tensor 36., tensor 10.]

In [166]:
gradients # added ex-post

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [167]:
# added ex-post: if given a vector, tape.gradient() will compute the gradient of the vector's sum
with tf.GradientTape() as tape:
  z1 = f(w1, w2 + 2.)
  z2 = f(w1, w2 + 5.)
  z3 = f(w1, w2 + 7.)

tape.gradient([z1, z2, z3], [w1, w2])

[<tf.Tensor: shape=(), dtype=float32, numpy=136.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=30.0>]

In [168]:
# added ex-post: shows that we get the same result as the previous cell
with tf.GradientTape() as tape:
  z1 = f(w1, w2 + 2.)
  z2 = f(w1, w2 + 5.)
  z3 = f(w1, w2 + 7.)
  z = z1 + z2 + z3

tape.gradient(z, [w1, w2])

[<tf.Tensor: shape=(), dtype=float32, numpy=136.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=30.0>]

In [169]:
# added ex-post: shows how to compute the jacobians and the hessians
with tf.GradientTape(persistent=True) as hessian_tape:
  with tf.GradientTape() as jacobian_tape:
    z = f(w1, w2)
  jacobians = jacobian_tape.gradient(z, [w1, w2])
hessians = [hessian_tape.gradient(jacobian, [w1, w2])
            for jacobian in jacobians]
del hessian_tape

In [170]:
jacobians # added ex-post

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [171]:
hessians # added ex-post

[[<tf.Tensor: shape=(), dtype=float32, numpy=6.0>,
  <tf.Tensor: shape=(), dtype=float32, numpy=2.0>],
 [<tf.Tensor: shape=(), dtype=float32, numpy=2.0>, None]]

In [173]:
def f(w1, w2):
  return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)

with tf.GradientTape() as tape:
  z = f(w1, w2) # the forward pass is not affected by stop_gradient()

gradients = tape.gradient(z, [w1, w2]) # returns [tensor 30., None]

In [174]:
gradients # added ex-post

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [175]:
x = tf.Variable(1e-50)
with tf.GradientTape() as tape:
  z = tf.sqrt(x)

tape.gradient(z, [x])

[<tf.Tensor: shape=(), dtype=float32, numpy=inf>]

In [176]:
tf.math.log(tf.exp(tf.constant(30., dtype=tf.float32)) + 1.) # added ex-post

<tf.Tensor: shape=(), dtype=float32, numpy=30.0>

In [177]:
# added ex-post
x = tf.Variable([1.0e30])
with tf.GradientTape() as tape:
  z = my_softplus(x)

tape.gradient(z, [x])

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>]

In [178]:
def my_softplus(z):
  return tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0., z)

In [179]:
@tf.custom_gradient
def my_softplus(z):
  def my_softplus_gradients(grads): # grads = backprop'ed from upper layers
    return grads * (1 - 1 / (1 + tf.exp(z))) # stable grads of softplus

  result = tf.math.log(1 + tf.exp(-tf.abs(z))) + tf.maximum(0., z)
  return result, my_softplus_gradients

In [180]:
# added ex-post: shows that the function is now stable, as well as its gradients
x = tf.Variable([1000.])
with tf.GradientTape() as tape:
  z = my_softplus(x)

z, tape.gradient(z, [x])

(<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1000.], dtype=float32)>,
 [<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>])

### § 12.3.9.	Custom Training Loops

In [181]:
tf.keras.utils.set_random_seed(42)  # added ex-post to ensure reproducibility

In [182]:
l2_reg = tf.keras.regularizers.l2(0.05)
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(30, activation="relu",
                          kernel_initializer="he_normal", kernel_regularizer=l2_reg),
    tf.keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [183]:
def random_batch(X, y, batch_size=32):
  idx = np.random.randint(len(X), size=batch_size)
  return X[idx], y[idx]

In [184]:
def print_status_bar(step, total, loss, metrics=None):
  metrics = " - ".join([f"{m.name}: {m.result():.4f}"
                        for m in [loss] + (metrics or [])])
  end = "" if step < total else "\n"
  print(f"\r{step}/{total} - " + metrics, end=end)

In [185]:
tf.keras.utils.set_random_seed(42)

In [186]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_squared_error
mean_loss = tf.keras.metrics.Mean(name="mean_loss")
metrics = [tf.keras.metrics.MeanAbsoluteError()]

In [187]:
for epoch in range(1, n_epochs + 1):
  print("Epoch {}/{}".format(epoch, n_epochs))
  for step in range(1, n_steps + 1):
    X_batch, y_batch = random_batch(X_train_scaled, y_train)
    with tf.GradientTape() as tape:
      y_pred = model(X_batch, training=True)
      main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
      loss = tf.add_n([main_loss] + model.losses)

    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    # added ex-post: if the model has variable constraints
    for variable in model.variables:
      if variable.constraint is not None:
        variable.assign(variable.constraint(variable))

    mean_loss(loss)
    for metric in metrics:
      metric(y_batch, y_pred)

    print_status_bar(step, n_steps, mean_loss, metrics)

  for metric in [mean_loss] + metrics:
    metric.reset_states()

Epoch 1/5
362/362 - mean_loss: 3.5419 - mean_absolute_error: 0.6640
Epoch 2/5
362/362 - mean_loss: 1.8693 - mean_absolute_error: 0.5431
Epoch 3/5
362/362 - mean_loss: 1.1428 - mean_absolute_error: 0.5030
Epoch 4/5
362/362 - mean_loss: 0.8501 - mean_absolute_error: 0.4977
Epoch 5/5
362/362 - mean_loss: 0.7280 - mean_absolute_error: 0.5014


In [188]:
for variable in model.variables:
  if variable.constraint is not None:
    variable.assign(variable.constraint(variable))

In [189]:
# added ex-post: shows how to use the tqdm package to display nice progress bars
from tqdm.notebook import trange
from collections import OrderedDict

In [190]:
# ditto
with trange(1, n_epochs + 1, desc="All epochs") as epochs:
  for epoch in epochs:
    with trange(1, n_steps + 1, desc=f"Epoch {epoch}/{n_epochs}") as steps:
      for step in steps:
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
          y_pred = model(X_batch)
          main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
          loss = tf.add_n([main_loss] + model.losses)

        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        for variable in model.variables:
          if variable.constraint is not None:
            variable.assign(variable.constraint(variable))

        status = OrderedDict()
        mean_loss(loss)
        status["loss"] = mean_loss.result().numpy()
        for metric in metrics:
          metric(y_batch, y_pred)
          status[metric.name] = metric.result().numpy()

        steps.set_postfix(status)

    for metric in [mean_loss] + metrics:
      metric.reset_states()

All epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 2/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 3/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 4/5:   0%|          | 0/362 [00:00<?, ?it/s]

Epoch 5/5:   0%|          | 0/362 [00:00<?, ?it/s]

## § 12.4.	TensorFlow Functions and Graphs

In [191]:
def cube(x):
  return x ** 3

In [192]:
cube(2)

8

In [193]:
cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [194]:
tf_cube = tf.function(cube)
tf_cube

<tensorflow.python.eager.polymorphic_function.polymorphic_function.Function at 0x7a2cc7a72800>

In [195]:
tf_cube(2)

<tf.Tensor: shape=(), dtype=int32, numpy=8>

In [196]:
tf_cube(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [197]:
@tf.function
def tf_cube(x):
  return x ** 3

In [198]:
tf_cube.python_function(2)

8

In [199]:
# added ex-post: TF functions and concrete functions
concrete_function = tf_cube.get_concrete_function(tf.constant(2.0))
concrete_function

<ConcreteFunction (x: TensorSpec(shape=(), dtype=tf.float32, name=None)) -> TensorSpec(shape=(), dtype=tf.float32, name=None) at 0x7A2CC6E2A6B0>

In [200]:
concrete_function(tf.constant(2.0))

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [201]:
concrete_function is tf_cube.get_concrete_function(tf.constant(2.0))

True

In [202]:
# added ex-post: exploring function definitions and graphs
concrete_function.graph

<tensorflow.python.framework.func_graph.FuncGraph at 0x7a2cc6e0acb0>

In [203]:
ops = concrete_function.graph.get_operations()
ops

[<tf.Operation 'x' type=Placeholder>,
 <tf.Operation 'pow/y' type=Const>,
 <tf.Operation 'pow' type=Pow>,
 <tf.Operation 'Identity' type=Identity>]

In [204]:
pow_op = ops[2]
list(pow_op.inputs)

[<tf.Tensor 'x:0' shape=() dtype=float32>,
 <tf.Tensor 'pow/y:0' shape=() dtype=float32>]

In [205]:
pow_op.outputs

[<tf.Tensor 'pow:0' shape=() dtype=float32>]

In [206]:
concrete_function.graph.get_operation_by_name('x')

<tf.Operation 'x' type=Placeholder>

In [207]:
concrete_function.graph.get_tensor_by_name('Identity:0')

<tf.Tensor 'Identity:0' shape=() dtype=float32>

In [208]:
concrete_function.function_def.signature

name: "__inference_tf_cube_470244"
input_arg {
  name: "x"
  type: DT_FLOAT
}
output_arg {
  name: "identity"
  type: DT_FLOAT
}

In [209]:
# added ex-post: how TF functions trace Python functions to extract their computation graphs
@tf.function
def tf_cube(x):
  print(f"x = {x}")
  return x ** 3

In [210]:
result = tf_cube(tf.constant(2.0))

x = Tensor("x:0", shape=(), dtype=float32)


In [211]:
result

<tf.Tensor: shape=(), dtype=float32, numpy=8.0>

In [212]:
result = tf_cube(2)

x = 2


In [213]:
result = tf_cube(3)

x = 3


In [214]:
result = tf_cube(tf.constant([[1., 2.]])) # added ex-post: new shape, trace!

x = Tensor("x:0", shape=(1, 2), dtype=float32)


In [215]:
result = tf_cube(tf.constant([[3., 4.], [5., 6.]])) # added ex-post: new shape, trace!



x = Tensor("x:0", shape=(2, 2), dtype=float32)


In [216]:
result = tf_cube(tf.constant([[7., 8.], [9., 10.]])) # added ex-post: same shape, no trace!

In [217]:
@tf.function(input_signature=[tf.TensorSpec([None, 28, 28], tf.float32)])
def shrink(images):
  print("Tracing", images)  # extra code to show when tracing happens
  return images[:, ::2, ::2] # drop half the rows and columns

In [218]:
tf.keras.utils.set_random_seed(42)

In [219]:
img_batch_1 = tf.random.uniform(shape=[100, 28, 28])
img_batch_2 = tf.random.uniform(shape=[50, 28, 28])
preprocessed_images = shrink(img_batch_1)  # added ex-post: works fine, traces the function
preprocessed_images = shrink(img_batch_2)  # added ex-post: works fine, same concrete function

Tracing Tensor("images:0", shape=(None, 28, 28), dtype=float32)


In [220]:
img_batch_3 = tf.random.uniform(shape=[2, 2, 2])
try:
  preprocessed_images = shrink(img_batch_3)  # added ex-post: TypeError! for incompatible inputs
except TypeError as ex:
  print(ex)

Binding inputs to tf.function failed due to `Can not cast TensorSpec(shape=(2, 2, 2), dtype=tf.float32, name=None) to TensorSpec(shape=(None, 28, 28), dtype=tf.float32, name=None)`. Received args: (<tf.Tensor: shape=(2, 2, 2), dtype=float32, numpy=
array([[[0.7413678 , 0.62854624],
        [0.01738465, 0.3431449 ]],

       [[0.51063764, 0.3777541 ],
        [0.07321596, 0.02137029]]], dtype=float32)>,) and kwargs: {} for signature: (images: TensorSpec(shape=(None, 28, 28), dtype=tf.float32, name=None)).


In [221]:
# added ex-post: using Autograph to capture control flow
@tf.function
def add_10(x):
  for i in range(10):
    x += 1
  return x

In [222]:
add_10(tf.constant(5))

<tf.Tensor: shape=(), dtype=int32, numpy=15>

In [223]:
add_10.get_concrete_function(tf.constant(5)).graph.get_operations()

[<tf.Operation 'x' type=Placeholder>,
 <tf.Operation 'add/y' type=Const>,
 <tf.Operation 'add' type=AddV2>,
 <tf.Operation 'add_1/y' type=Const>,
 <tf.Operation 'add_1' type=AddV2>,
 <tf.Operation 'add_2/y' type=Const>,
 <tf.Operation 'add_2' type=AddV2>,
 <tf.Operation 'add_3/y' type=Const>,
 <tf.Operation 'add_3' type=AddV2>,
 <tf.Operation 'add_4/y' type=Const>,
 <tf.Operation 'add_4' type=AddV2>,
 <tf.Operation 'add_5/y' type=Const>,
 <tf.Operation 'add_5' type=AddV2>,
 <tf.Operation 'add_6/y' type=Const>,
 <tf.Operation 'add_6' type=AddV2>,
 <tf.Operation 'add_7/y' type=Const>,
 <tf.Operation 'add_7' type=AddV2>,
 <tf.Operation 'add_8/y' type=Const>,
 <tf.Operation 'add_8' type=AddV2>,
 <tf.Operation 'add_9/y' type=Const>,
 <tf.Operation 'add_9' type=AddV2>,
 <tf.Operation 'Identity' type=Identity>]

In [224]:
# added ex-post: shows how to use tf.while_loop (usually @tf.function is simpler)
@tf.function
def add_10(x):
  condition = lambda i, x: tf.less(i, 10)
  body = lambda i, x: (tf.add(i, 1), tf.add(x, 1))
  final_i, final_x = tf.while_loop(condition, body, [tf.constant(0), x])
  return final_x

In [225]:
add_10(tf.constant(5))

<tf.Tensor: shape=(), dtype=int32, numpy=15>

In [226]:
add_10.get_concrete_function(tf.constant(5)).graph.get_operations()

[<tf.Operation 'x' type=Placeholder>,
 <tf.Operation 'Const' type=Const>,
 <tf.Operation 'while/maximum_iterations' type=Const>,
 <tf.Operation 'while/loop_counter' type=Const>,
 <tf.Operation 'while' type=StatelessWhile>,
 <tf.Operation 'Identity' type=Identity>]

In [227]:
@tf.function
def add_10(x):
  for i in tf.range(10):
    x = x + 1
  return x

In [228]:
add_10.get_concrete_function(tf.constant(0)).graph.get_operations()

[<tf.Operation 'x' type=Placeholder>,
 <tf.Operation 'range/start' type=Const>,
 <tf.Operation 'range/limit' type=Const>,
 <tf.Operation 'range/delta' type=Const>,
 <tf.Operation 'range' type=Range>,
 <tf.Operation 'sub' type=Sub>,
 <tf.Operation 'floordiv' type=FloorDiv>,
 <tf.Operation 'mod' type=FloorMod>,
 <tf.Operation 'zeros_like' type=Const>,
 <tf.Operation 'NotEqual' type=NotEqual>,
 <tf.Operation 'Cast' type=Cast>,
 <tf.Operation 'add' type=AddV2>,
 <tf.Operation 'zeros_like_1' type=Const>,
 <tf.Operation 'Maximum' type=Maximum>,
 <tf.Operation 'while/maximum_iterations' type=Const>,
 <tf.Operation 'while/loop_counter' type=Const>,
 <tf.Operation 'while' type=StatelessWhile>,
 <tf.Operation 'Identity' type=Identity>]

In [229]:
# added ex-post: handling variables and other resources in TF functions
counter = tf.Variable(0)

@tf.function
def increment(counter, c=1):
  return counter.assign_add(c)

increment(counter) # counter is now equal to 1
increment(counter) # counter is now equal to 2

<tf.Tensor: shape=(), dtype=int32, numpy=2>

In [230]:
function_def = increment.get_concrete_function(counter).function_def
function_def.signature.input_arg[0]

name: "counter"
type: DT_RESOURCE

In [231]:
counter = tf.Variable(0)

@tf.function
def increment(c=1):
  return counter.assign_add(c)

In [232]:
increment()
increment()

<tf.Tensor: shape=(), dtype=int32, numpy=2>

In [233]:
function_def = increment.get_concrete_function().function_def
function_def.signature.input_arg[0]

name: "assignaddvariableop_resource"
type: DT_RESOURCE

In [234]:
class Counter:
  def __init__(self):
    self.counter = tf.Variable(0)

  @tf.function
  def increment(self, c=1):
    return self.counter.assign_add(c)

In [235]:
c = Counter()
c.increment()
c.increment()

<tf.Tensor: shape=(), dtype=int32, numpy=2>

In [236]:
@tf.function
def add_10(x):
  for i in tf.range(10):
    x += 1
  return x

print(tf.autograph.to_code(add_10.python_function))

def tf__add(x):
    with ag__.FunctionScope('add_10', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
        do_return = False
        retval_ = ag__.UndefinedReturnValue()

        def get_state():
            return (x,)

        def set_state(vars_):
            nonlocal x
            (x,) = vars_

        def loop_body(itr):
            nonlocal x
            i = itr
            x = ag__.ld(x)
            x += 1
        i = ag__.Undefined('i')
        ag__.for_stmt(ag__.converted_call(ag__.ld(tf).range, (10,), None, fscope), None, loop_body, get_state, set_state, ('x',), {'iterate_names': 'i'})
        try:
            do_return = True
            retval_ = ag__.ld(x)
        except:
            do_return = False
            raise
        return fscope.ret(retval_, do_return)



In [237]:
# added ex-post: shows how to display the autograph code with syntax highlighting
def display_tf_code(func):
  from IPython.display import display, Markdown
  if hasattr(func, "python_function"):
    func = func.python_function
  code = tf.autograph.to_code(func)
  display(Markdown(f'```python\n{code}\n```'))

In [238]:
display_tf_code(add_10)

```python
def tf__add(x):
    with ag__.FunctionScope('add_10', 'fscope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as fscope:
        do_return = False
        retval_ = ag__.UndefinedReturnValue()

        def get_state():
            return (x,)

        def set_state(vars_):
            nonlocal x
            (x,) = vars_

        def loop_body(itr):
            nonlocal x
            i = itr
            x = ag__.ld(x)
            x += 1
        i = ag__.Undefined('i')
        ag__.for_stmt(ag__.converted_call(ag__.ld(tf).range, (10,), None, fscope), None, loop_body, get_state, set_state, ('x',), {'iterate_names': 'i'})
        try:
            do_return = True
            retval_ = ag__.ld(x)
        except:
            do_return = False
            raise
        return fscope.ret(retval_, do_return)

```

In [239]:
# added ex-post: using TF functions w/ tf.keras (or not)

# custom loss function
def my_mse(y_true, y_pred):
  print("Tracing loss my_mse()")
  return tf.reduce_mean(tf.square(y_pred - y_true))

# custom metric function
def my_mae(y_true, y_pred):
  print("Tracing metric my_mae()")
  return tf.reduce_mean(tf.abs(y_pred - y_true))

In [240]:
# custom layer
class MyDense(tf.keras.layers.Layer):
  def __init__(self, units, activation=None, **kwargs):
    super().__init__(**kwargs)
    self.units = units
    self.activation = tf.keras.activations.get(activation)

  def build(self, input_shape):
    self.kernel = self.add_weight(name='kernel',
                                  shape=(input_shape[1], self.units),
                                  initializer='uniform',
                                  trainable=True)
    self.biases = self.add_weight(name='bias',
                                  shape=(self.units,),
                                  initializer='zeros',
                                  trainable=True)

  def call(self, X):
    print("Tracing MyDense.call()")
    return self.activation(X @ self.kernel + self.biases)

In [241]:
tf.keras.utils.set_random_seed(42)

In [242]:
# Custom model
class MyModel(tf.keras.Model):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.hidden1 = MyDense(30, activation="relu")
    self.hidden2 = MyDense(30, activation="relu")
    self.output_ = MyDense(1)

  def call(self, input):
    print("Tracing MyModel.call()")
    hidden1 = self.hidden1(input)
    hidden2 = self.hidden2(hidden1)
    concat = tf.keras.layers.concatenate([input, hidden2])
    output = self.output_(concat)
    return output

model = MyModel()

In [243]:
model.compile(loss=my_mse, optimizer="nadam", metrics=[my_mae])

In [244]:
model.fit(X_train_scaled, y_train,
          epochs=2,
          validation_data=(X_valid_scaled, y_valid))
model.evaluate(X_test_scaled, y_test)

Epoch 1/2
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Epoch 2/2


[0.4213947057723999, 0.47099101543426514]

In [245]:
tf.keras.utils.set_random_seed(42)

In [246]:
model = MyModel(dynamic=True)

In [247]:
model.compile(loss=my_mse,
              optimizer="nadam",
              metrics=[my_mae])

In [248]:
model.fit(X_train_scaled[:64], y_train[:64],
          epochs=1,
          validation_data=(X_valid_scaled[:64], y_valid[:64]),
          verbose=0)
model.evaluate(X_test_scaled[:64], y_test[:64],
               verbose=0)

Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()




Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()


[5.549154281616211, 2.0611484050750732]

In [249]:
tf.keras.utils.set_random_seed(42)

In [250]:
model = MyModel()

In [251]:
model.compile(loss=my_mse,
              optimizer="nadam",
              metrics=[my_mae],
              run_eagerly=True)

In [252]:
model.fit(X_train_scaled[:64], y_train[:64],
          epochs=1,
          validation_data=(X_valid_scaled[:64], y_valid[:64]),
          verbose=0)
model.evaluate(X_test_scaled[:64], y_test[:64],
               verbose=0)

Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()
Tracing MyModel.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing MyDense.call()
Tracing loss my_mse()
Tracing metric my_mae()


[5.549154281616211, 2.0611484050750732]

In [253]:
# added ex-post: custom optimizers
class MyMomentumOptimizer(tf.keras.optimizers.Optimizer):
    def __init__(self, learning_rate=0.001, momentum=0.9, name="MyMomentumOptimizer", **kwargs):
        """Gradient descent with momentum optimizer."""
        super().__init__(name, **kwargs)
        self._learning_rate = self._build_learning_rate(learning_rate)
        self.momentum = momentum

    def build(self, var_list):
        """Initialize optimizer variables.

        Args:
          var_list: list of model variables to build SGD variables on.
        """
        super().build(var_list)
        if getattr(self, "_built", False):
            return
        self.momentums = []
        for var in var_list:
            self.momentums.append(
                self.add_variable_from_reference(
                    model_variable=var, variable_name="m"
                )
            )
        self._built = True

    def update_step(self, gradient, variable):
        """Update step given gradient and the associated model variable."""
        lr = tf.cast(self.learning_rate, variable.dtype)
        m = None
        var_key = self._var_key(variable)
        momentum = tf.cast(self.momentum, variable.dtype)
        m = self.momentums[self._index_dict[var_key]]
        if m is None:
            variable.assign_add(-gradient * lr)
        else:
            m.assign(-gradient * lr + m * momentum)
            variable.assign_add(m)

    def get_config(self):
        base_config = super().get_config()
        print("Config!")
        return {
            **base_config,
            "learning_rate": self._serialize_hyperparameter(self._learning_rate),
            "momentum": self.momentum,
        }

In [254]:
optimizer = MyMomentumOptimizer()

tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=[8])])
model.compile(loss="mse", optimizer=optimizer)
model.fit(X_train_scaled, y_train, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0x7a2cc6ec3460>

In [255]:
optimizer = tf.keras.optimizers.SGD(learning_rate=0.001, momentum=0.9)

tf.keras.utils.set_random_seed(42)
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=[8])])
model.compile(loss="mse", optimizer=optimizer)
model.fit(X_train_scaled, y_train, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0x7a2cc87b4d00>

## § 12.5.	Exercises

12) Implement a custom layer that performs _layer normalization_ (we will use this type of layer in § 15):
* The `build()` method should define two trainable weights *α* and *β*, both of shape `input_shape[-1:]` and data type `tf.float32`. *α* should be initialized with 1s, and *β* with 0s.
* The `call()` method should compute the mean_ μ _and standard deviation_ σ _of each instance's features. For this, you can use `tf.nn.moments(inputs, axes=-1, keepdims=True)`, which returns the mean μ and the variance σ<sup>2</sup> of all instances (compute the square root of the variance to get the standard deviation). Then the function should compute and return *α*⊗(*X* - μ)/(σ + ε) + *β*, where ⊗ represents itemwise multiplication (`*`) and ε is a smoothing term (small constant to avoid division by zero, e.g., 0.001).
* Ensure that your custom layer produces the same (or very nearly the same) output as the `tf.keras.layers.LayerNormalization` layer.

In [256]:
class LayerNormalization(tf.keras.layers.Layer):
  def __init__(self, eps=0.001, **kwargs):
    super().__init__(*kwargs)
    self.eps = eps

  def build(self, batch_input_shape):
    self.alpha = self.add_weight(
        name="alpha", shape=batch_input_shape[-1:],
        initializer="ones"
    )
    self.beta = self.add_weight(
        name="beta", shape=batch_input_shape[-1:],
        initializer="zeros"
    )

    def call(self, X):
      mean, variance = tf.nn.moments(X, axes=-1, keepdims=True)
      return self.alpha * (X - mean) / (tf.sqrt(variance + self.eps)) + self.beta

    def get_config(self):
      base_config = super().get_config()
      return {**base_config, "eps": self.eps}

In [257]:
X = X_train.astype(np.float32)

custom_layer_norm = LayerNormalization()
keras_layer_norm = tf.keras.layers.LayerNormalization()

tf.reduce_mean(tf.keras.losses.mean_absolute_error(
    keras_layer_norm(X), custom_layer_norm(X)
))

<tf.Tensor: shape=(), dtype=float32, numpy=202.50676>

In [258]:
tf.keras.utils.set_random_seed(42)
random_alpha = np.random.rand(X.shape[-1])
random_beta = np.random.rand(X.shape[-1])

custom_layer_norm.set_weights([random_alpha, random_beta])
keras_layer_norm.set_weights([random_alpha, random_beta])

tf.reduce_mean(tf.keras.losses.mean_absolute_error(
    keras_layer_norm(X), custom_layer_norm(X)
))

<tf.Tensor: shape=(), dtype=float32, numpy=202.25449>

13) Train a model using a custom training loop to tackle the Fashion MNIST dataset (see § 10):
* Display the epoch, iteration, mean training loss, and mean accuracy over each epoch (updated at each iteration), as well as the validation loss and accuracy at the end of each epoch.
* Try using a different optimizer with a different learning rate for the upper layers and the lower layers.

In [259]:
(X_train_full, y_train_full), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
x_train_full = X_train_full.astype(np.float32) / 255.
X_valid, X_train = X_train_full[:5000], X_train_full[5000:]
y_valid, y_train = y_train_full[:5000], y_train_full[5000:]
X_test = X_test.astype(np.float32) / 255.

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz


In [260]:
tf.keras.utils.set_random_seed(42)

In [261]:
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=[28, 28]),
    tf.keras.layers.Dense(100, activation="relu"),
    tf.keras.layers.Dense(10, activation="softmax")
])

In [262]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = tf.keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = tf.keras.losses.sparse_categorical_crossentropy
mean_loss = tf.keras.metrics.Mean()
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]

In [263]:
with trange(1, n_epochs + 1, desc="All epochs") as epochs:
  for epoch in epochs:
    with trange(1, n_steps + 1, desc=f"Epoch {epoch}/{n_epochs}") as steps:
      for step in steps:
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape() as tape:
          y_pred = model(X_batch)
          main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
          loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        for variable in model.variables:
          if variable.constraint is not None:
            variable.assign(variable.constraint(variable))
        status = OrderedDict()
        mean_loss(loss)
        status["loss"] = mean_loss.result().numpy()
        for metric in metrics:
          metric(y_batch, y_pred)
          status[metric.name] = metric.result().numpy()
        steps.set_postfix(status)
      y_pred = model(X_valid)
      status["val_loss"] = np.mean(loss_fn(y_valid, y_pred))
      status["val_accuracy"] = np.mean(tf.keras.metrics.sparse_categorical_accuracy(
          tf.constant(y_valid, dtype=np.float32), y_pred
      ))
      steps.set_postfix(status)
    for metric in [mean_loss] + metrics:
      metric.reset_states()

All epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 2/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 3/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 4/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 5/5:   0%|          | 0/1718 [00:00<?, ?it/s]

In [264]:
tf.keras.utils.set_random_seed(42)

In [265]:
lower_layers = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=[28, 28]),
    tf.keras.layers.Dense(100, activation="relu")
])
upper_layers = tf.keras.Sequential([
    tf.keras.layers.Dense(10, activation="softmax")
])
model = tf.keras.Sequential([
    lower_layers, upper_layers
])

In [266]:
lower_optimizer = tf.keras.optimizers.SGD(learning_rate=1e-4)
upper_optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-3)

In [267]:
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
loss_fn = tf.keras.losses.sparse_categorical_crossentropy
mean_loss = tf.keras.metrics.Mean()
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]

In [268]:
with trange(1, n_epochs + 1, desc="All epochs") as epochs:
  for epoch in epochs:
    with trange(1, n_steps + 1, desc=f"Epoch {epoch}/{n_epochs}") as steps:
      for step in steps:
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape(persistent=True) as tape:
          y_pred = model(X_batch)
          main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
          loss = tf.add_n([main_loss] + model.losses)
        for layers, optimizer in ((lower_layers, lower_optimizer),
                                  (upper_layers, upper_optimizer)):
          gradients = tape.gradient(loss, layers.trainable_variables)
          optimizer.apply_gradients(zip(gradients, layers.trainable_variables))
        del tape
        for variable in model.variables:
          if variable.constraint is not None:
            variable.assign(variable.constraint(variable))
        status = OrderedDict()
        mean_loss(loss)
        status["loss"] = mean_loss.result().numpy()
        for metric in metrics:
          metric(y_batch, y_pred)
          status[metric.name] = metric.result().numpy()
        steps.set_postfix(status)
      y_pred = model(X_valid)
      status["val_loss"] = np.mean(loss_fn(y_valid, y_pred))
      status["val_accuracy"] = np.mean(tf.keras.metrics.sparse_categorical_accuracy(
          tf.constant(y_valid, dtype=np.float32), y_pred
      ))
    for metric in [mean_loss] + metrics:
      metric.reset_states()

All epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 2/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 3/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 4/5:   0%|          | 0/1718 [00:00<?, ?it/s]

Epoch 5/5:   0%|          | 0/1718 [00:00<?, ?it/s]