In [1]:
import logging
import boto3
import os
from botocore.exceptions import ClientError
import pathlib
public_bucket = "sagemaker-sample-files"
local_data_dir = "./data"
path = pathlib.Path(local_data_dir)
path.mkdir(parents=True, exist_ok=True)

# Download training and testing data from a public S3 bucket
def download_from_s3(data_dir="/tmp/data", train=True):
    """Download MNIST dataset and convert it to numpy array

    Args:
        data_dir (str): directory to save the data
        train (bool): download training set

    Returns:
        None
    """
    # project root
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)

    if train:
        images_file = "train-images-idx3-ubyte.gz"
        labels_file = "train-labels-idx1-ubyte.gz"
    else:
        images_file = "t10k-images-idx3-ubyte.gz"
        labels_file = "t10k-labels-idx1-ubyte.gz"

    # download objects
    s3 = boto3.client("s3")
    bucket = public_bucket
    for obj in [images_file, labels_file]:
        key = os.path.join("datasets/image/MNIST", obj)
        dest = os.path.join(data_dir, obj)
        if not os.path.exists(dest):
            s3.download_file(bucket, key, dest)
    return


download_from_s3(local_data_dir, True)
download_from_s3(local_data_dir, False)

In [3]:
from __future__ import print_function

import gzip
import json
import logging
import os
import sys
import traceback

import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.layers import Conv2D, Dense, Flatten

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

In [4]:
# Define the model object


class SmallConv(Model):
    def __init__(self):
        super(SmallConv, self).__init__()
        self.conv1 = Conv2D(32, 3, activation="relu")
        self.flatten = Flatten()
        self.d1 = Dense(128, activation="relu")
        self.d2 = Dense(10)

    def call(self, x):
        x = self.conv1(x)
        x = self.flatten(x)
        x = self.d1(x)
        return self.d2(x)


# Decode and preprocess data
def convert_to_numpy(data_dir, images_file, labels_file):
    """Byte string to numpy arrays"""
    with gzip.open(os.path.join(data_dir, images_file), "rb") as f:
        images = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1, 28, 28)

    with gzip.open(os.path.join(data_dir, labels_file), "rb") as f:
        labels = np.frombuffer(f.read(), np.uint8, offset=8)

    return (images, labels)


def mnist_to_numpy(data_dir, train):
    """Load raw MNIST data into numpy array

    Args:
        data_dir (str): directory of MNIST raw data.
            This argument can be accessed via SM_CHANNEL_TRAINING

        train (bool): use training data

    Returns:
        tuple of images and labels as numpy array
    """

    if train:
        images_file = "train-images-idx3-ubyte.gz"
        labels_file = "train-labels-idx1-ubyte.gz"
    else:
        images_file = "t10k-images-idx3-ubyte.gz"
        labels_file = "t10k-labels-idx1-ubyte.gz"

    return convert_to_numpy(data_dir, images_file, labels_file)


def normalize(x, axis):
    eps = np.finfo(float).eps
    mean = np.mean(x, axis=axis, keepdims=True)
    # avoid division by zero
    std = np.std(x, axis=axis, keepdims=True) + eps
    return (x - mean) / std


# Training logic


def train():
    # create data loader from the train / test channels
    x_train, y_train = mnist_to_numpy(data_dir=train_data, train=True)
    x_test, y_test = mnist_to_numpy(data_dir=test_data, train=False)

    x_train, x_test = x_train.astype(np.float32), x_test.astype(np.float32)

    # normalize the inputs to mean 0 and std 1
    x_train, x_test = normalize(x_train, (1, 2)), normalize(x_test, (1, 2))

    # expand channel axis
    # tf uses depth minor convention
    x_train, x_test = np.expand_dims(x_train, axis=3), np.expand_dims(x_test, axis=3)

    # normalize the data to mean 0 and std 1
    train_loader = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(len(x_train))
        .batch(batch_size)
    )

    test_loader = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size)

    model = SmallConv()
    model.compile()
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    optimizer = tf.keras.optimizers.Adam(
        learning_rate=learning_rate, beta_1=beta_1, beta_2=beta_2
    )

    train_loss = tf.keras.metrics.Mean(name="train_loss")
    train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy")

    test_loss = tf.keras.metrics.Mean(name="test_loss")
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="test_accuracy")

    @tf.function
    def train_step(images, labels):
        with tf.GradientTape() as tape:
            predictions = model(images, training=True)
            loss = loss_fn(labels, predictions)
        grad = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grad, model.trainable_variables))

        train_loss(loss)
        train_accuracy(labels, predictions)
        return

    @tf.function
    def test_step(images, labels):
        predictions = model(images, training=False)
        t_loss = loss_fn(labels, predictions)
        test_loss(t_loss)
        test_accuracy(labels, predictions)
        return

    logger.info("Training starts ...")
    for epoch in range(epochs):
        train_loss.reset_states()
        train_accuracy.reset_states()
        test_loss.reset_states()
        test_accuracy.reset_states()

        for batch, (images, labels) in enumerate(train_loader):
            train_step(images, labels)

        logger.info(
            f"Epoch {epoch + 1}, "
            f"Loss: {train_loss.result()}, "
            f"Accuracy: {train_accuracy.result()}, "
        )

        for images, labels in test_loader:
            test_step(images, labels)

        # metric for the hyperparameter tunner
        logger.info(f"Test Loss: {test_loss.result()}")
        logger.info(f"Test Accuracy: {test_accuracy.result()}")

    # Save the model
    # A version number is needed for the serving container
    # to load the model
    version = "00000000"
    ckpt_dir = os.path.join(model_dir, version)
    if not os.path.exists(ckpt_dir):
        os.makedirs(ckpt_dir)
    model.save(ckpt_dir)
    return



In [5]:
batch_size = 32
epochs=1
learning_rate=1e-3
beta_1=0.9
beta_2=0.999

# Environment variables given by the training image
model_dir="./model"
path = pathlib.Path(model_dir)
path.mkdir(parents=True, exist_ok=True)
train_data = "./data"
test_data = "./data"


In [6]:
%%time
train()

Training starts ...
[2022-09-21 01:39:51.928 tensorflow-2-3-cpu-py-ml-t3-medium-dbca98283d57d615662c4efa28c8:52 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None
[2022-09-21 01:39:52.134 tensorflow-2-3-cpu-py-ml-t3-medium-dbca98283d57d615662c4efa28c8:52 INFO profiler_config_parser.py:102] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
Epoch 1, Loss: 0.13681690394878387, Accuracy: 0.958816647529602, 


INFO:__main__:Epoch 1, Loss: 0.13681690394878387, Accuracy: 0.958816647529602, 


Test Loss: 0.06367088109254837


INFO:__main__:Test Loss: 0.06367088109254837


Test Accuracy: 0.9787999987602234


INFO:__main__:Test Accuracy: 0.9787999987602234


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.


INFO:tensorflow:Assets written to: ./model/00000000/assets


INFO:tensorflow:Assets written to: ./model/00000000/assets


CPU times: user 1min 9s, sys: 5.53 s, total: 1min 15s
Wall time: 43.3 s
