# Train an MNIST model with TensorFlow

MNIST is a widely-used dataset for handwritten digit classification. It consists of 70,000 labeled 28x28 pixel grayscale images of hand-written digits. The dataset is split into 60,000 training images and 10,000 test images. There are 10 classes (one for each of the 10 digits). This tutorial will show how to train a TensorFlow V2 model on MNIST model on SageMaker.

## Runtime

This notebook is meant to be run locally withing SageMaker studio. Of course, this is suitable only for debugging purposes when you are working with small datasets. Once you have determined that it runs well, the we can switch to the mode where we run training script on SageMaker infrastracture in a containerized environment.

## Contents

1. [TensorFlow Estimator](#TensorFlow-Estimator)
1. [Implement the training entry point](#Implement-the-training-entry-point)
1. [Set hyperparameters](#Set-hyperparameters)
1. [Set up channels for training and testing data](#Set-up-channels-for-training-and-testing-data)
1. [Run the training script on SageMaker](#Run-the-training-script-on-SageMaker)
1. [Inspect and store model data](#Inspect-and-store-model-data)
1. [Test and debug the entry point before running the training container](#Test-and-debug-the-entry-point-before-running-the-training-container)

In [5]:
import os
import json

import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Conv2D, Dense, Flatten

import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker import get_execution_role

sess = sagemaker.Session()
role = get_execution_role()
bucket = sess.default_bucket()

print ("Sagemaker version = {}".format (sagemaker.__version__))   
print ("S3 bucket for model artifacts = {}".format (bucket))

Sagemaker version = 2.44.0
S3 bucket for model artifacts = sagemaker-us-east-1-937351930975


In [6]:
import logging
import boto3
from botocore.exceptions import ClientError
#
# Download training and testing data from a public S3 bucket
#
def download_from_s3(bucket, data_dir="./data", train=True):
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    if train:
        images_file = "train-images-idx3-ubyte.gz"
        labels_file = "train-labels-idx1-ubyte.gz"
    else:
        images_file = "t10k-images-idx3-ubyte.gz"
        labels_file = "t10k-labels-idx1-ubyte.gz"

    s3 = boto3.client("s3")
    for obj in [images_file, labels_file]:
        key = os.path.join("datasets/image/MNIST", obj)
        dest = os.path.join(data_dir, obj)
        if not os.path.exists(dest):
            s3.download_file(bucket, key, dest)
    return

sample_files_bucket = f"sagemaker-sample-files"
download_from_s3 (sample_files_bucket, "./data", True)    # trainung data
download_from_s3 (sample_files_bucket, "./data", False)   # test data

print ("Completed the download of MNIST training/test data from '{}' S3 bucket".format(sample_files_bucket))

Completed the download of MNIST training/test data from 'sagemaker-sample-files' S3 bucket


In [7]:
import gzip

def load_mnist_data (data_dir, train):
    if train:
        images_file = "train-images-idx3-ubyte.gz"
        labels_file = "train-labels-idx1-ubyte.gz"
    else:
        images_file = "t10k-images-idx3-ubyte.gz"
        labels_file = "t10k-labels-idx1-ubyte.gz"
        
    with gzip.open(os.path.join(data_dir, images_file), "rb") as f_images:
        images = np.frombuffer(f_images.read(), np.uint8, offset=16).reshape(-1, 28, 28)   
        images = images.astype(np.float32)
        
    with gzip.open(os.path.join(data_dir, labels_file), "rb") as f_labels:
        labels = np.frombuffer(f_labels.read(), np.uint8, offset=8).reshape(-1, 1)         

    return (images, labels)

In [8]:
def normalize(x, axis):
    epsilon = np.finfo(float).eps
    mean = np.mean(x, axis=axis, keepdims=True)
    std = np.std(x, axis=axis, keepdims=True) + epsilon
    return (x - mean) / std

In [9]:
data_dir = "./data"

# 
# Load the training and test MNIST datasets and create Numpy array representation of the samples and labels
#
X_train, y_train = load_mnist_data(data_dir=data_dir, train=True)
X_test, y_test = load_mnist_data(data_dir=data_dir, train=False)
print ("Training data size = {} and {}".format (X_train.shape, y_train.shape))
print ("Test data size = {} and {}".format (X_test.shape, y_test.shape))

Training data size = (60000, 28, 28) and (60000, 1)
Test data size = (10000, 28, 28) and (10000, 1)


In [10]:
#
# Define hyperparameters for TensorFlow training
#
batch_size = 1024         # mini-batch size
epochs = 5                # number of epochs of Gradient Descent
learning_rate = 0.001     # initial learning rate used for Gradient Descent
beta_1 = 0.9              # momentum parameter for Adam Optimizer
beta_2 = 0.999            # rmsprop parameter for Adam Optimzier   
dropout = 0.8             # rate used for dropout regularization

In [11]:
#
# https://www.tensorflow.org/guide/data
# https://www.tensorflow.org/api_docs/python/tf/data/Dataset
# Using TensorFlow APIs, create mini-batches out of the training set by first shuffling it and then splitting it into mini-batches
# tf.data.Dataset.from_tensor_slices: Creates a Dataset by slicing the given tensors along their first dimension. This operation preserves the structure of the input tensors, removing the first dimension of each tensor and using it as the dataset dimension.
# tf.data.Dataset.shuffle: Randomly shuffles the elements of this dataset by filling a buffer with given number of elements from the Dataset. For perfect shuffling, a buffer size greater than or equal to the full size of the dataset is required.
# tf.data.Dataset.batch: Combines consecutive elements of this dataset into batches and returns a BatchDataset so that that we can iterate over the batches
#
def generate_mini_batches_tf (X, y, mini_batch_size = 128, seed = 0):
    m = X.shape[0]    
    mini_batches = (tf.data.Dataset.from_tensor_slices((X, y)).shuffle(m,seed).batch(mini_batch_size))
    print ("Number of mini-batches created = {}".format (len(mini_batches)))
    return mini_batches

In [28]:
#
# Create mini-batches out of the training set by first shuffling it and then splitting it into mini-batches
# This is a DIY implementation of creating mini-batches from the original dataset
# Random shuffling is done synchronously between X and Y such that after the shuffling the i-th sample in X corresponds to the i-the label in Y. 
# The shuffling step ensures that examples will be split randomly into different mini-batches.
#
import math

def generate_mini_batches (X, Y, mini_batch_size = 128, seed = 0):
    #
    # Step 1: Shuffle (X, Y).
    # Note that we should use a different seed in each epoch to ensure that the samples are shuffled in a different order each time
    # Note that the array X passed into this function is a four dimensional array, after adding a dimension to account for the color channel.
    #
    m = X.shape[0]    
    np.random.seed(seed)  
    permutation = list(np.random.permutation(m))
    shuffled_X = X[permutation, :, :, :]
    shuffled_Y = Y[permutation, :]

    #
    # Step 2: Partition (shuffled_X, shuffled_Y). Minus the end case (last mini-batch < mini_batch_size)
    #
    mini_batches = []
    num_complete_minibatches = math.floor(m/mini_batch_size) 
    start = 0;
    end = 0;
    for k in range(0, num_complete_minibatches):
        start = k*mini_batch_size;
        end = start + mini_batch_size;
        mini_batch_X = shuffled_X [start:end, :, :, :];
        mini_batch_Y = shuffled_Y [start:end, :];
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)
    
    #
    # Handling the end case (last mini-batch < mini_batch_size)
    #
    if m % mini_batch_size != 0:
        start = end;
        end = m;
        mini_batch_X = shuffled_X [start:end, :, :, :];
        mini_batch_Y = shuffled_Y [start:end, :];
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)
    
    print ("Number of mini-batches created = {}".format (len(mini_batches)))
    return mini_batches

In [29]:
#
# Define the layers of neural network 
#
from tensorflow.keras import Model
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Dropout
from tensorflow.keras.initializers import HeNormal

#
# To create a neural network, use tensorflow.keras.Model as the base class
# https://www.tensorflow.org/api_docs/python/tf/keras/Model
# Once the model is created, you can perform the following tasks:
# 1. config the model with losses and metrics with model.compile()
# 2. train the model with model.fit()
# 3. use the model to do prediction with model.predict()
#
class SmallConv(Model):
    #
    # When implementing a neural network by subclassing the Model class,
    # one should define the layers in __init__() and should implement the model's forward pass in call() method
    #    
    def __init__(self, rate):
        super(SmallConv, self).__init__()
        initializer = HeNormal(seed=1)        
        self.conv1 = Conv2D(32, 3, activation="relu", kernel_initializer=initializer)
        self.flatten = Flatten()
        self.dense1 = Dense(128, activation="relu", kernel_initializer=initializer)
        self.dense2 = Dense(10, activation='linear', kernel_initializer=initializer)
        self.dropout = Dropout(rate)

    #
    # This method defines the model's forward propagation'
    # We can optionally have a boolean argument that can be used to specify a different behavior in training and inference
    # Here, we are using Dropout during training only. Dropout is an approach to regularization where a random number of units is selected in each hidden layer and their contributions simply dropped out while computing the output of that layer
    #
    def call(self, inputs, training=False):
        x = self.conv1(inputs)
        x = self.flatten(x)
        x = self.dense1(x)
        if training:
          x = self.dropout(x, training=training)        
        return self.dense2(x)
    

In [30]:
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import SparseCategoricalAccuracy

def train_builtin (X_train, y_train, X_test, y_test, epochs, batch_size, learning_rate, beta_1, beta_2, dropout_rate):
    #
    # Normalize the training and test samples to mean 0 and std 1
    # Note that we are normalizing across axis 1 & 2 of the samples.
    # That means that we are normalizing across the (28x28) values for each sample and not across all samples
    #
    X_train, X_test = normalize(X_train, (1,2)), normalize(X_test, (1,2))
    
    #
    # MNIST dataset comprises (28x28) B&W images
    # Until now, we have been dealing with the training/test as a two-dimensional array (n_samples, 784)
    # In order for it to be compatible with the Convolutional layer in the neural network, we have to reshape it as a (n_sample, 28, 28) array
    # Additionally, we need to add a extra axis that represents the color channel
    # Because most deep learning neural networks (TensorFlow Keras included) expect a separate channel for color
    # numpy.expand_dims: Insert a new axis that will appear at the position specified by 'axis' in the expanded array shape.
    #
    X_train = np.expand_dims(X_train, axis=3)
    X_test = np.expand_dims(X_test, axis=3)

    #
    # Create the neural network model
    #
    model = SmallConv(dropout_rate)
    model.compile(
        loss = SparseCategoricalCrossentropy(from_logits=True),
        optimizer = Adam(learning_rate=learning_rate, beta_1=beta_1, beta_2=beta_2),
        metrics = [SparseCategoricalAccuracy()]            
    )
    #
    # Fit the model with the training set
    # Here, we are letting TensorFlow handle all aspects of creating mini-batches and iterating over them in each epoch
    #
    print ("Fitting the Model")
    model.fit(
        X_train, y_train,
        shuffle = True,
        batch_size = batch_size,
        epochs = epochs,
        verbose = 1
    )
    
    #
    # Evaluate the model using the test set
    #
    print ("Evaluating the Model")    
    model.evaluate(
        X_test, y_test,
        batch_size = batch_size,
        verbose = 1        
    )

In [31]:
#
# When we are training with TensorFlow's builtin methods, all you need to do is create a model and then call model.compile() and model.fit()
# What TensorFlow does when model.fit() is called is fully customizable.
# A core principle of Keras is progressive disclosure of complexity. You should always be able to get into lower-level workflows in a gradual way
#
# Option #1
# https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit
# When you need to customize what fit() does, you should override the training step function, namely, train_step, of the Model class. 
# This is the function that is called by fit() for every batch of data. 
# You will then be able to call fit() as usual and it will be running your own learning algorithm.
# 
# Option #2
# https://www.tensorflow.org/guide/keras/writing_a_training_loop_from_scratch
# Now, if you want very low-level control over training & evaluation, you should write your own training & evaluation loops from scratch. 
# In this implementation, we are following this option.
#
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import SparseCategoricalAccuracy
from tensorflow.keras.metrics import Mean

def train_custom (X_train, y_train, 
                  X_test, y_test, 
                  epochs, batch_size, learning_rate, beta_1, beta_2, dropout_rate, n_batches = 10):
    #
    # Normalize the training and test samples to mean 0 and std 1
    # Note that we are normalizing across axis 1 & 2 of the samples.
    # That means that we are normalizing across the (28x28) values for each sample and not across all samples
    #
    X_train, X_test = normalize(X_train, (1,2)), normalize(X_test, (1,2))

    #
    # MNIST dataset comprises (28x28) B&W images
    # Until now, we have been dealing with the training/test as a two-dimensional array (n_samples, 784)
    # In order for it to be compatible with the Convolutional layer in the neural network, we have to reshape it as a (n_sample, 28, 28) array
    # Additionally, we need to add a extra axis that represents the color channel
    # Because most deep learning neural networks (TensorFlow Keras included) expect a separate channel for color
    #
    X, Xt = np.expand_dims(X_train, axis=3), np.expand_dims(X_test, axis=3)
    y, yt = y_train, y_test
    
    #
    # Create mini-batches for the test set. It is not necessary to do this if the test set is much smaller than the training set.
    # Also, we will be using the test set only for inferencing. Hence, it would suffice to create these mini-batches just once and resuse them.
    #
    test_mini_batches = generate_mini_batches_tf (Xt, yt, batch_size)
    #test_mini_batches = generate_mini_batches (Xt, yt, batch_size)

    #
    # Create the neural network model
    # Note that we are compiling the model without specifying a loss function and an optimizer
    # These can be instantiated separately and used in the custom training loop to have more granular control over the training algorithm
    #
    model = SmallConv(dropout_rate)
    model.compile()
    
    #
    # Define loss function and optimizer to be used in Gradient Descent calculations
    #
    loss_fn = SparseCategoricalCrossentropy(from_logits=True)
    optimizer = Adam(learning_rate=learning_rate, beta_1=beta_1, beta_2=beta_2)  
    
    #
    # We can track both training and test loss/accuracy using built-in metrics and calling appropriate methods on them
    # Call metric.update_state() after each batch
    # Call metric.result() when you need to display the current value of the metric
    # Call metric.reset_states() when you need to clear the state of the metric (typically at the end of an epoch)
    #
    train_metric_loss = Mean(name="train_loss")
    train_metric_accuracy = SparseCategoricalAccuracy(name="train_accuracy")
    test_metric_loss = Mean(name="test_loss")
    test_metric_accuracy = SparseCategoricalAccuracy(name="test_accuracy")
    
    #
    # You can use tf.function to make graphs out of your programs. 
    # It is a transformation tool that creates Python-independent dataflow graphs out of your Python code. 
    # This will help you create performant and portable models, and it is required to use SavedModel.
    # Here, we are encapsulating all the operations performed in a training step with a mini-batch of data
    #
    @tf.function
    def train_step_optimized (X_batch, y_batch):
        #
        # Open a GradientTape to record the operations run during the forward pass, which enables auto-differentiation
        # Run the forward pass of the layer and get the predictions (also called logits) from the model
        # Operations that the layer applies to its inputs are going to be recorded on the GradientTape.    
        # Compute the loss value for this mini-batch.
        #
        with tf.GradientTape() as tape:
            logits = model (X_batch, training=True)  
            loss_value = loss_fn (y_batch, logits)

        #
        # Use the gradient tape to automatically retrieve the gradients of the trainable variables with respect to the loss.
        # Thin one step encapsulates all of the complex back propagation in a neural network.
        # Run one step of gradient descent by updating the value of the variables to minimize the loss.
        # Update the training metric(s) so that we can later query them using result()
        #
        grads = tape.gradient (loss_value, model.trainable_weights)
        optimizer.apply_gradients (zip(grads, model.trainable_weights))
        train_metric_loss.update_state (loss_value)
        train_metric_accuracy.update_state (y_batch, logits)
        return
    
    #
    # Here, we are encapsulating all the operations performed in a test step with a mini-batch of data
    #
    @tf.function    
    def test_step_optimized (X_batch, y_batch):
        logits = model (X_batch, training=False)  
        loss_value = loss_fn (y_batch, logits)
        test_metric_loss.update_state (loss_value)
        test_metric_accuracy.update_state (y_batch, logits)  
        return  
    
    #
    # A custom training look follows these steps:
    # 1. We open a for loop that iterates over epochs
    # 2. For each epoch, we open a for loop that iterates over the dataset, in batches
    # 3. For each batch, we open a GradientTape() scope
    # 4. Inside this scope, we call the model (forward pass) and compute the loss
    # 5. Outside the scope, we retrieve the gradients of the weights of the model with regard to the loss
    # 6. Finally, we use the optimizer to update the weights of the model based on the gradients    
    #
    seed = 0
    for i in range(epochs):
        print("Start of epoch {}".format(i+1))
        
        #
        # Create minibatches for this epoch
        # Increment the seed to reshuffle differently the dataset after each epoch
        #
        seed = seed + 1
        train_mini_batches = generate_mini_batches (X, y, batch_size, seed)
        #train_mini_batches = generate_mini_batches_tf (X, y, batch_size, seed)
                
        #
        # Note that we would need to call reset_states() on our metrics between each epoch! 
        # Otherwise calling result() on the metric would return an average since the start of training instead of the per-epoch averages.
        #
        test_metric_accuracy.reset_states()
        test_metric_loss.reset_states()
        train_metric_accuracy.reset_states()
        train_metric_loss.reset_states()        
                   
        step = 0
        for (X_batch, y_batch) in train_mini_batches:        
            step = step + 1
            train_step_optimized(X_batch, y_batch)

            #
            # Log periodically based on the n_batches parameter
            #
            if step % n_batches == 0:
                samples = (step + 1) * batch_size
                accuracy = train_metric_accuracy.result()
                loss = train_metric_loss.result()
                print("Loss at step {} = {} after {} samples; Accuracy = {}".format (step, loss, samples, accuracy))  
                
        #
        # Run a validation against the test set at the end of each epoch
        #
        for (X_batch, y_batch) in test_mini_batches:   
            test_step_optimized(X_batch, y_batch)
                
        #
        # Print loss, accuracy metrics for this epoch for both training and test samples
        #
        print("Epoch {}; Loss = {}; Training Accuracy {}; Test Accuracy {}".format ((i+1), 
                                                                                    train_metric_loss.result(), 
                                                                                    train_metric_accuracy.result(), 
                                                                                    test_metric_accuracy.result()))       

In [32]:
epochs = 5
dropout = 0.0
train_builtin (X_train, y_train, X_test, y_test, epochs, batch_size, learning_rate, beta_1, beta_2, dropout)

Fitting the Model
[2023-04-25 16:41:36.927 tensorflow-2-3-cpu-py-ml-t3-medium-dbca98283d57d615662c4efa28c8:57 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None
[2023-04-25 16:41:37.212 tensorflow-2-3-cpu-py-ml-t3-medium-dbca98283d57d615662c4efa28c8:57 INFO profiler_config_parser.py:102] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5
Evaluating the Model


In [33]:
epochs = 5
dropout = 0.0
train_custom (X_train, y_train, X_test, y_test, epochs, batch_size, learning_rate, beta_1, beta_2, dropout)

Number of mini-batches created = 10
Start of epoch 1
Number of mini-batches created = 59
Loss at step 10 = 9.448932647705078 after 11264 samples; Accuracy = 0.31718748807907104
Loss at step 20 = 5.545870304107666 after 21504 samples; Accuracy = 0.46479493379592896
Loss at step 30 = 4.031325817108154 after 31744 samples; Accuracy = 0.5448567867279053
Loss at step 40 = 3.214946746826172 after 41984 samples; Accuracy = 0.6015869379043579
Loss at step 50 = 2.702432632446289 after 52224 samples; Accuracy = 0.6426953077316284
Epoch 1; Loss = 2.384625196456909; Training Accuracy 0.6667166948318481; Test Accuracy 0.8177000284194946
Start of epoch 2
Number of mini-batches created = 59
Loss at step 10 = 0.5760658383369446 after 11264 samples; Accuracy = 0.8135741949081421
Loss at step 20 = 0.5615295767784119 after 21504 samples; Accuracy = 0.8194824457168579
Loss at step 30 = 0.5451758503913879 after 31744 samples; Accuracy = 0.8213866949081421
Loss at step 40 = 0.5272332429885864 after 41984 sa