# How to create custom training loops in Keras
Click the image below to read the post online.

<a target="_blank" href="https://www.machinelearningnuggets.com/keras-custom-training-loops
"><img src="https://www.machinelearningnuggets.com/ezoimgfmt/digitalpress.fra1.cdn.digitaloceanspaces.com/mhujhsj/2022/07/logho-1.png?ezimgfmt=ng:webp/ngcb1" alt="Open in ML Nuggets"></a>

In [None]:
pip install layer

In [None]:
import layer
mnist_train = layer.get_dataset('layer/fashion_mnist/datasets/fashion_mnist_train').to_pandas()
mnist_test = layer.get_dataset('layer/fashion_mnist/datasets/fashion_mnist_test').to_pandas()

In [None]:
mnist_train["images"][17]

In [None]:
mnist_test["images"][23]

In [None]:
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import tensorflow as tf

In [None]:
import numpy as np
def images_to_np_array(image_column):
    return np.array([np.array(im.getdata()).reshape((im.size[1], im.size[0])) for im in image_column])
train_images = images_to_np_array(mnist_train.images)
test_images = images_to_np_array(mnist_test.images)
train_labels = mnist_train.labels
test_labels = mnist_test.labels

In [None]:
train_images.shape

In [None]:
train_images = train_images / 255.0
test_images = test_images / 255.0
# # Make sure images have shape (28, 28, 1)
train_images = np.expand_dims(train_images, -1)
test_images = np.expand_dims(test_images, -1)

In [None]:
train_images.shape

In [None]:
ds_train_batch = tf.data.Dataset.from_tensor_slices((train_images, train_labels))

In [None]:
training_data = ds_train_batch.batch(32)

In [None]:
ds_test_batch = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
testing_data = ds_test_batch.batch(32)

In [None]:
parameters = {"shape":28, "activation": "relu", "classes": 10, "units":12, "optimizer":"adam", "epochs":100,"kernel_size":3,"pool_size":2, "dropout":0.5}

In [None]:
class CustomBlock(tf.keras.Model):
  def __init__(self, filters):
    super(CustomBlock, self).__init__(name='')
    filters1, filters2 = filters
    self.conv2a = layers.Conv2D(filters=filters1,input_shape=(28,28,1), kernel_size=(parameters["kernel_size"], parameters["kernel_size"]), activation=parameters["activation"])

    self.maxpool1a = layers.MaxPooling2D(pool_size=(parameters["pool_size"], parameters["pool_size"]))

    self.conv2b = layers.Conv2D(filters2, kernel_size=(parameters["kernel_size"], parameters["kernel_size"]), activation=parameters["activation"])

    self.maxpool2b = layers.MaxPooling2D(pool_size=(parameters["pool_size"], parameters["pool_size"]))

    self.flatten1a = layers.Flatten()
    self.dropout1a = layers.Dropout(parameters["dropout"])
    self.dense1a = layers.Dense(parameters["classes"], activation="softmax")

  def call(self, input_tensor):
    x = self.conv2a(input_tensor)
    x = tf.nn.relu(x)
    x = self.maxpool1a(x)

    x = self.conv2b(x)
    x = tf.nn.relu(x)
    x = self.maxpool2b(x)

    x = self.flatten1a(x)
    x = self.dropout1a(x)
    x = self.dense1a(x)
    return tf.nn.softmax(x)


In [None]:
model = CustomBlock([32,64])

In [None]:
input_shape = (1, 28, 28, 1)
x = tf.random.normal(input_shape)
_ = model(x)

In [None]:
x.shape

In [None]:
model.layers

In [None]:
len(model.variables)

In [None]:
model.summary()

In [None]:
predictions = model(test_images)
predictions[:5]
tf.nn.softmax(predictions)

In [None]:
print("Prediction: {}".format(tf.math.argmax(predictions, axis=1)))
print("    Labels: {}".format(train_labels))

In [None]:
prediction = tf.math.argmax(predictions[:1], axis=1, output_type=tf.int64)
prediction

## Define the loss and gradients function

In [None]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()

In [None]:
def loss(model, x, y, training):
  # training=training is needed only if there are layers with different
  # behavior during training versus inference (e.g. Dropout).
  y_ = model(x, training=training)

  return loss_object(y_true=y, y_pred=y_)

l = loss(model, test_images, test_labels, training=False)
print("Loss test: {}".format(l))

In [None]:
def grad(model, inputs, targets):
  with tf.GradientTape() as tape:
    loss_value = loss(model, inputs, targets, training=True)
  return loss_value, tape.gradient(loss_value, model.trainable_variables)

## Create an optimizer

In [None]:
optimizer = tf.keras.optimizers.Adam()

## Training loop

In [None]:
from tqdm.notebook import trange

In [None]:
## Note: Rerunning this cell uses the same model parameters

# Keep results for plotting
train_loss_results = []
train_accuracy_results = []

num_epochs = 10

for epoch in trange(num_epochs):
  epoch_loss_avg = tf.keras.metrics.Mean()
  epoch_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()

  # Training loop - using batches of 32
  for x, y in training_data:
    # Optimize the model
    loss_value, grads = grad(model, x, y)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # Track progress
    epoch_loss_avg.update_state(loss_value)  # Add current batch loss
    # Compare predicted label to actual label
    # training=True is needed only if there are layers with different
    # behavior during training versus inference (e.g. Dropout).
    epoch_accuracy.update_state(y, model(x, training=True))

  # End epoch
  train_loss_results.append(epoch_loss_avg.result())
  train_accuracy_results.append(epoch_accuracy.result())
  print("Epoch {}: Loss: {:.3f}, Accuracy: {:.3%}".format(epoch + 1,
                                                                epoch_loss_avg.result(),
                                                                epoch_accuracy.result()))

## Visualize the loss function over time

In [None]:
fig, axes = plt.subplots(2, sharex=True, figsize=(12, 8))
fig.suptitle('Training Metrics')

axes[0].set_ylabel("Loss", fontsize=14)
axes[0].plot(train_loss_results)

axes[1].set_ylabel("Accuracy", fontsize=14)
axes[1].set_xlabel("Epoch", fontsize=14)
axes[1].plot(train_accuracy_results)
plt.show()

## Evaluate the model on the test dataset

In [None]:
test_accuracy = tf.keras.metrics.Accuracy()

for (x, y) in testing_data:
  # training=False is needed only if there are layers with different
  # behavior during training versus inference (e.g. Dropout).
  logits = model(x, training=False)    
  prediction = tf.math.argmax(logits, axis=1, output_type=tf.int64)
  test_accuracy(prediction, y)
print("Test set accuracy: {:.3%}".format(test_accuracy.result()))

In [None]:
tf.stack([y,prediction],axis=1)

## Use the trained model to make predictions

In [None]:
# training=False is needed only if there are layers with different
# behavior during training versus inference (e.g. Dropout).
predictions = model(test_images[0:5], training=False)
class_names = ["T-shirt/top","Trouser","Pullover","Dress","Coat","Sandal","Shirt","Sneaker","Bag","Ankle boot"]
for i, logits in enumerate(predictions):
  class_idx = tf.math.argmax(logits).numpy()
  p = tf.nn.softmax(logits)[class_idx]
  name = class_names[class_idx]
  print("Image {} prediction: {} ({:4.1f}%)".format(i, name, 100*p))

## Where to go from here
Follow us on [LinkedIn](https://www.linkedin.com/company/mlnuggets), [Twitter](https://twitter.com/ml_nuggets), [GitHub](https://github.com/mlnuggets) and subscribe to our [blog](https://www.machinelearningnuggets.com/#/portal) so that you don't miss a new issue.