In [22]:
from tensorflow import keras
from tensorflow.keras import layers, Input, Model
from tensorflow.keras.layers import Concatenate, Dense, Dropout
from tensorflow.keras.metrics import Metric, SparseCategoricalAccuracy, Mean
from tensorflow.keras.models import load_model
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import RMSprop
from keras.utils import plot_model
from tensorflow.keras.datasets import mnist
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt

## Writing your own training and evaluation loops

### Low-level usage of metrics

In [4]:
metric = SparseCategoricalAccuracy()
targets = [0, 1, 2]
predictions = [[1, 0, 0], [0, 1, 0], [0, 0, 1]]

metric.update_state(targets, predictions)
current_result = metric.result()

print(f"result: {current_result:.2f}")

result: 1.00


In [6]:
values = [0, 1, 2, 3, 4]
mean_tracker = Mean()

for value in values:
    mean_tracker.update_state(value)
    
print(f"Mean of values: {mean_tracker.result():.2f}")

Mean of values: 2.00


### A complete training and evaluation loop

**Step 1: the training step function**

In [15]:
def get_mnist_model():
    
    inputs = Input(shape=(28 * 28,))
    features = Dense(512, activation="relu")(inputs)
    features = Dropout(0.5)(features)
    outputs = Dense(10, activation="softmax")(features)
    model = Model(inputs, outputs)
    
    return model

In [16]:
(images, labels), (test_images, test_labels) = mnist.load_data()
images = images.reshape((60000, 28 * 28)).astype("float32") / 255
test_images = test_images.reshape((10000, 28 * 28)).astype("float32") / 255.0
train_images, val_images = images[10000:], images[:10000]
train_labels, val_labels = labels[10000:], labels[:10000]

In [17]:
model = get_mnist_model()

loss_fn = SparseCategoricalCrossentropy()
optimizer = RMSprop()
metrics = [SparseCategoricalAccuracy()]
loss_tracking_metric = Mean()

def train_step(inputs, targets):
    
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))

    logs = {}
    
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs[metric.name] = metric.result()

    loss_tracking_metric.update_state(loss)
    logs["loss"] = loss_tracking_metric.result()
    
    return logs

**Writing a step-by-step training loop: resetting the metrics**

In [18]:
def reset_metrics():
    
    for metric in metrics:
        metric.reset_state()
    
    loss_tracking_metric.reset_state()

**Writing a step-by-step training loop: the loop itself**

In [23]:
training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
training_dataset = training_dataset.batch(32)
epochs = 3

for epoch in range(epochs):
    
    reset_metrics()
    
    for inputs_batch, targets_batch in training_dataset:
        logs = train_step(inputs_batch, targets_batch)
    
    print(f"Results at the end of epoch {epoch}")
    
    for key, value in logs.items():
        print(f"...{key}: {value:.4f}")

Results at the end of epoch 0
...sparse_categorical_accuracy: 0.9666
...loss: 0.1278
Results at the end of epoch 1
...sparse_categorical_accuracy: 0.9710
...loss: 0.1182
Results at the end of epoch 2
...sparse_categorical_accuracy: 0.9733
...loss: 0.1116


**Writing a step-by-step evaluation loop**

In [24]:
def test_step(inputs, targets):
    
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)
    logs = {}
    
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()

    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    
    return logs

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics()

for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch)
    
print("Evaluation results:")

for key, value in logs.items():
    print(f"...{key}: {value:.4f}")

Evaluation results:
...val_sparse_categorical_accuracy: 0.9724
...val_loss: 0.1252


### Make it fast with tf.function

**Adding a `tf.function` decorator to turn the code into computation graph**

In [25]:
@tf.function
def test_step(inputs, targets):
    
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)
    logs = {}
    
    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()

    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()
    
    return logs

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)
reset_metrics()

for inputs_batch, targets_batch in val_dataset:
    logs = test_step(inputs_batch, targets_batch)

print("Evaluation results:")

for key, value in logs.items():
    print(f"...{key}: {value:.4f}")

Evaluation results:
...val_sparse_categorical_accuracy: 0.9724
...val_loss: 0.1252


### Leveraging fit() with a custom training loop

**Implementing a custom training step to use with `fit()`**

In [26]:
loss_fn = SparseCategoricalCrossentropy()
loss_tracker = Mean(name="loss")

class CustomModel(Model):
    
    def train_step(self, data):
        
        inputs, targets = data
        
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = loss_fn(targets, predictions)
        
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))
        loss_tracker.update_state(loss)
        
        return {"loss": loss_tracker.result()}

    @property
    def metrics(self):
        return [loss_tracker]

In [27]:
inputs = Input(shape=(28 * 28,))
features = Dense(512, activation="relu")(inputs)
features = Dropout(0.5)(features)
outputs = Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)

model.compile(optimizer=RMSprop())
model.fit(train_images, train_labels, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x1ba4bef9ac0>

In [28]:
class CustomModel(keras.Model):
    
    def train_step(self, data):
        
        inputs, targets = data
        
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = self.compiled_loss(targets, predictions)
            
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))
        self.compiled_metrics.update_state(targets, predictions)
        
        return {m.name: m.result() for m in self.metrics}

In [29]:
inputs = Input(shape=(28 * 28,))
features = Dense(512, activation="relu")(inputs)
features = Dropout(0.5)(features)
outputs = Dense(10, activation="softmax")(features)
model = CustomModel(inputs, outputs)

model.compile(optimizer=RMSprop(), loss=SparseCategoricalCrossentropy(), metrics=[SparseCategoricalAccuracy()])
model.fit(train_images, train_labels, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x1ba5c185550>