# Custom Training Loops in Keras

In [2]:
import os
import warnings
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten, Input
from tensorflow.keras.callbacks import Callback
import numpy as np

warnings.filterwarnings('ignore')

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

## Basic custom training loop

### Dataset

In [3]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)

### Define the model

In [4]:
model = Sequential([
  Flatten(input_shape=(28, 28)),
  Dense(128, activation='relu'),
  Dense(10)
])

### Define loss function and optimizer

In [5]:
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()

### Implement the custom training loop

- Iterate over the dataset for a specified number of epochs. 
- Compute the loss and apply gradients to update the model's weights. 

In [6]:
epochs = 2
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)
for epoch in range(epochs):
  print(f"Start of epoch {epoch + 1}")

  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      logits = model(x_batch_train, training=True)
      loss_value = loss_fn(y_batch_train, logits)
    
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

    if step % 200 == 0:
      print(f"Epoch {epoch + 1} - Step {step}: Loss = {loss_value.numpy()}")


Start of epoch 1
Epoch 1 - Step 0: Loss = 2.332484245300293
Epoch 1 - Step 200: Loss = 0.34551799297332764
Epoch 1 - Step 400: Loss = 0.1508319228887558
Epoch 1 - Step 600: Loss = 0.14460636675357819
Epoch 1 - Step 800: Loss = 0.14067909121513367
Epoch 1 - Step 1000: Loss = 0.470578670501709
Epoch 1 - Step 1200: Loss = 0.18002387881278992
Epoch 1 - Step 1400: Loss = 0.25883451104164124
Epoch 1 - Step 1600: Loss = 0.22765761613845825
Epoch 1 - Step 1800: Loss = 0.18212977051734924
Start of epoch 2
Epoch 2 - Step 0: Loss = 0.07223241031169891
Epoch 2 - Step 200: Loss = 0.18540161848068237
Epoch 2 - Step 400: Loss = 0.09718567878007889
Epoch 2 - Step 600: Loss = 0.0350455641746521
Epoch 2 - Step 800: Loss = 0.09738698601722717
Epoch 2 - Step 1000: Loss = 0.2148551046848297
Epoch 2 - Step 1200: Loss = 0.08518266677856445
Epoch 2 - Step 1400: Loss = 0.19933189451694489
Epoch 2 - Step 1600: Loss = 0.18989154696464539
Epoch 2 - Step 1800: Loss = 0.09375859797000885


## Adding accuracy metric

Enhance the custom training loop by adding an accuracy metric to monitor model performance.

In [14]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

x_train, x_test = x_train / 255.0, x_test / 255.0 
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)

model = Sequential([
  Flatten(input_shape=(28, 28)),
  Dense(128, activation='relu'),
  Dense(10)
])

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy()

### Implement the custom training loop with accuracy

In [13]:
epochs = 5
for epoch in range(epochs):
  print(f'Start of epoch {epoch + 1}')

  for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      logits = model(x_batch_train, training=True)
      loss_value = loss_fn(y_batch_train, logits)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

    accuracy_metric.update_state(y_batch_train, logits)

    if step % 200 == 0:
      print(f"Epoch {epoch + 1} - Step {step}: Loss = {loss_value.numpy()} Accuracy = {accuracy_metric.result().numpy()}")

  accuracy_metric.reset_state()
  

Start of epoch 1
Epoch 1 - Step 0: Loss = 0.04676184803247452 Accuracy = 0.9428402185440063
Epoch 1 - Step 200: Loss = 0.11416544020175934 Accuracy = 0.9444523453712463
Epoch 1 - Step 400: Loss = 0.09451249986886978 Accuracy = 0.9457134008407593
Epoch 1 - Step 600: Loss = 0.045429475605487823 Accuracy = 0.9471529722213745
Epoch 1 - Step 800: Loss = 0.04518900811672211 Accuracy = 0.9483904838562012
Epoch 1 - Step 1000: Loss = 0.11729727685451508 Accuracy = 0.9496224522590637
Epoch 1 - Step 1200: Loss = 0.05716107040643692 Accuracy = 0.950717031955719
Epoch 1 - Step 1400: Loss = 0.07786379754543304 Accuracy = 0.951738715171814
Epoch 1 - Step 1600: Loss = 0.10416413843631744 Accuracy = 0.9526432156562805
Epoch 1 - Step 1800: Loss = 0.030319634824991226 Accuracy = 0.9536569714546204
Start of epoch 2
Epoch 2 - Step 0: Loss = 0.027988862246274948 Accuracy = 1.0
Epoch 2 - Step 200: Loss = 0.06710056960582733 Accuracy = 0.9836753606796265
Epoch 2 - Step 400: Loss = 0.07618517428636551 Accuracy

## Custom callback for advanced logging

In [None]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

x_train, x_test = x_train / 255.0, x_test / 255.0 
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)

model = Sequential([
  Flatten(input_shape=(28, 28)),
  Dense(128, activation='relu'),
  Dense(10)
])

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()
accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy()

### Implement the custom training loop with custom callback

In [15]:
from tensorflow.keras.callbacks import Callback

class CustomCallback(Callback):
  def on_epoch_end(self, epoch, logs=None):
    logs = logs or {}
    print(f"End of epoch {epoch + 1}, loss: {logs.get('loss')}, accuracy: {logs.get('accuracy')}")


In [16]:
epochs = 2
custom_callback = CustomCallback()

for epoch in range(epochs):
    print(f'Start of epoch {epoch + 1}')
    
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            # Forward pass: Compute predictions
            logits = model(x_batch_train, training=True)
            # Compute loss
            loss_value = loss_fn(y_batch_train, logits)
        
        # Compute gradients
        grads = tape.gradient(loss_value, model.trainable_weights)
        # Apply gradients to update model weights
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        
        # Update the accuracy metric
        accuracy_metric.update_state(y_batch_train, logits)

        # Log the loss and accuracy every 200 steps
        if step % 200 == 0:
            print(f'Epoch {epoch + 1} Step {step}: Loss = {loss_value.numpy()} Accuracy = {accuracy_metric.result().numpy()}')
    
    # Call the custom callback at the end of each epoch
    custom_callback.on_epoch_end(epoch, logs={'loss': loss_value.numpy(), 'accuracy': accuracy_metric.result().numpy()})
    
    # Reset the metric at the end of each epoch
    accuracy_metric.reset_state()

Start of epoch 1
Epoch 1 Step 0: Loss = 2.3126721382141113 Accuracy = 0.0625
Epoch 1 Step 200: Loss = 0.4393109083175659 Accuracy = 0.8289800882339478
Epoch 1 Step 400: Loss = 0.18942619860172272 Accuracy = 0.8644015192985535
Epoch 1 Step 600: Loss = 0.1980287730693817 Accuracy = 0.8800436854362488
Epoch 1 Step 800: Loss = 0.17768359184265137 Accuracy = 0.8926342129707336
Epoch 1 Step 1000: Loss = 0.4339821934700012 Accuracy = 0.9004745483398438
Epoch 1 Step 1200: Loss = 0.1382765918970108 Accuracy = 0.9072127342224121
Epoch 1 Step 1400: Loss = 0.24768947064876556 Accuracy = 0.9124955534934998
Epoch 1 Step 1600: Loss = 0.20073220133781433 Accuracy = 0.9161266684532166
Epoch 1 Step 1800: Loss = 0.16371320188045502 Accuracy = 0.9203394055366516
End of epoch 1, loss: 0.04219643771648407, accuracy: 0.9223166704177856
Start of epoch 2
Epoch 2 Step 0: Loss = 0.06693096458911896 Accuracy = 1.0
Epoch 2 Step 200: Loss = 0.21642707288265228 Accuracy = 0.96159827709198
Epoch 2 Step 400: Loss = 0.

## Exercise 1

Implement a basic custom training loop to train a simple neural network on the MNIST dataset. 


- Set up the environment and load the dataset. 

- Define the model with a Flatten layer and two Dense layers. 

- Define the loss function and optimizer. 

- Implement a custom training loop to iterate over the dataset, compute the loss, and update the model's weights. 


In [17]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32)

model = Sequential([
  Flatten(input_shape=(28, 28)),
  Dense(128, activation='relu'),
  Dense(10)
])

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam()

for epoch in range(5):
  for x_batch, y_batch in train_dataset:
    with tf.GradientTape() as tape:
      logits = model(x_batch, training=True)
      loss = loss_fn(y_batch, logits)

    grads = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
  print(f'Epoch {epoch + 1} - Loss: {loss.numpy()}')

Epoch 1 - Loss: 0.04062194377183914
Epoch 2 - Loss: 0.042877212166786194
Epoch 3 - Loss: 0.04047567397356033
Epoch 4 - Loss: 0.033673569560050964
Epoch 5 - Loss: 0.01646129973232746


## Exercise 2: Adding accuracy metric

Objective: Enhance the custom training loop by adding an accuracy metric to monitor model performance. 


1. Set up the environment and define the model, loss function, and optimizer. 

2. Add Sparse Categorical Accuracy as a metric. 

3. Implement the custom training loop with accuracy tracking.

In [18]:
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data() 
x_train = x_train / 255.0 
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32) 

# Step 2: Define the Model
model = Sequential([ 
    Flatten(input_shape=(28, 28)), 
    Dense(128, activation='relu'), 
    Dense(10) 
]) 

# Step 3: Define Loss Function, Optimizer, and Metric
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) 
optimizer = tf.keras.optimizers.Adam() 
accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy() 

# Step 4: Implement the Custom Training Loop with Accuracy Tracking
epochs = 5 
for epoch in range(epochs): 
    for x_batch, y_batch in train_dataset: 
        with tf.GradientTape() as tape: 
            logits = model(x_batch, training=True) 
            loss = loss_fn(y_batch, logits) 
        grads = tape.gradient(loss, model.trainable_weights) 
        optimizer.apply_gradients(zip(grads, model.trainable_weights)) 
        accuracy_metric.update_state(y_batch, logits) 
    print(f'Epoch {epoch + 1}: Loss = {loss.numpy()} Accuracy = {accuracy_metric.result().numpy()}') 
    accuracy_metric.reset_state()

Epoch 1: Loss = 0.03837450221180916 Accuracy = 0.9246500134468079
Epoch 2: Loss = 0.044245507568120956 Accuracy = 0.9652500152587891
Epoch 3: Loss = 0.05348261445760727 Accuracy = 0.9758666753768921
Epoch 4: Loss = 0.03362075984477997 Accuracy = 0.982450008392334
Epoch 5: Loss = 0.017705854028463364 Accuracy = 0.987333357334137


## Exercise 3

Implement a custom callback to log additional metrics and information during training. 


1. Set up the environment and define the model, loss function, optimizer, and metric. 

2. Create a custom callback to log additional metrics at the end of each epoch. 

3. Implement the custom training loop with the custom callback. 

In [19]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() 
x_train = x_train / 255.0 
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(32) 

# Step 2: Define the Model
model = Sequential([ 
    tf.keras.Input(shape=(28, 28)),
    Flatten(), 
    Dense(128, activation='relu'), 
    Dense(10) 
]) 

# Step 3: Define Loss Function, Optimizer, and Metric
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) 
optimizer = tf.keras.optimizers.Adam() 
accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy() 

# Step 4: Implement the Custom Callback
class CustomCallback(Callback): 
    def on_epoch_end(self, epoch, logs=None): 
        print(f'End of epoch {epoch + 1}, loss: {logs.get("loss")}, accuracy: {logs.get("accuracy")}') 

# Step 5: Implement the Custom Training Loop with Custom Callback
custom_callback = CustomCallback() 

for epoch in range(5): 
    for x_batch, y_batch in train_dataset: 
        with tf.GradientTape() as tape: 
            logits = model(x_batch, training=True) 
            loss = loss_fn(y_batch, logits) 
        grads = tape.gradient(loss, model.trainable_weights) 
        optimizer.apply_gradients(zip(grads, model.trainable_weights)) 
        accuracy_metric.update_state(y_batch, logits) 
    custom_callback.on_epoch_end(epoch, logs={'loss': loss.numpy(), 'accuracy': accuracy_metric.result().numpy()}) 
    accuracy_metric.reset_state()

End of epoch 1, loss: 0.029754186049103737, accuracy: 0.9248999953269958
End of epoch 2, loss: 0.030950594693422318, accuracy: 0.965666651725769
End of epoch 3, loss: 0.045021310448646545, accuracy: 0.9767833352088928
End of epoch 4, loss: 0.048431504517793655, accuracy: 0.9832000136375427
End of epoch 5, loss: 0.02104269526898861, accuracy: 0.987583339214325


## Exercise 4

Add functionality to save the results of each hyperparameter tuning iteration as JSON files in a specified directory. 


Modify the tuning loop to save each iteration's results as JSON files.

Specify the directory where these JSON files will be stored for easier retrieval and analysis of tuning results.

In [25]:
import json
import os
import keras_tuner as kt
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification

# Step 1: Load your dataset
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)

# Step 2: Define the model-building function
def build_model(hp):
    model = Sequential()
    # Tune the number of units in the first Dense layer
    model.add(Dense(units=hp.Int('units', min_value=32, max_value=512, step=32),
                    activation='relu'))
    model.add(Dense(1, activation='sigmoid'))  # Binary classification example
    model.compile(optimizer=Adam(hp.Float('learning_rate', 1e-4, 1e-2, sampling='LOG')),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

# Step 3: Initialize a Keras Tuner RandomSearch tuner
tuner = kt.RandomSearch(
    build_model,
    objective='val_accuracy',
    max_trials=10,
    executions_per_trial=1,
    directory='tuner_results',
    project_name='hyperparam_tuning'
)

# Step 4: Run the tuner search (make sure the data is correct)
tuner.search(X_train, y_train, validation_data=(X_val, y_val), epochs=5)

# Step 5: Save the tuning results as JSON files
try:
    for i in range(10):
        # Fetch the best hyperparameters from the tuner
        best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
        
        # Results dictionary to save hyperparameters and score
        results = {
            "trial": i + 1,
            "hyperparameters": best_hps.values,  # Hyperparameters tuned in this trial
            "score": None  # Add any score or metrics if available
        }

        # Save the results as JSON
        with open(os.path.join('tuner_results', f"trial_{i + 1}.json"), "w") as f:
            json.dump(results, f)

except IndexError:
    print("Tuning process has not completed or no results available.")

Trial 10 Complete [00h 00m 01s]
val_accuracy: 0.9700000286102295

Best val_accuracy So Far: 0.9900000095367432
Total elapsed time: 00h 00m 09s


> __num_trials__ specifies the number of top hyperparameter sets to return. Setting num_trials=1 means that it will return only the best set of hyperparameters found during the tuning process.