# CIFAR10 Image Classification with TFRecord Dataset

For this exercise notebook, you should be able to use the `Python 3 (TensorFlow 2.6 Python 3.8 CPU Optimized)` kernel on SageMaker Studio, or `conda_tensorflow2_p38` on classic SageMaker Notebook Instances.

---

## Introduction

Your new colleague in the data science team (who isn't very familiar with SageMaker) has written a nice notebook to tackle an image classification problem with Keras: [Local Notebook.ipynb](Local%20Notebook.ipynb).

It works OK with the simple CIFAR10 data set they were working on before, but now they'd like to take advantage of some of the features of SageMaker to tackle bigger and harder challenges.

In [31]:
# setup
import re
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from functools import partial
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import tempfile
import random

# Python Built-Ins:
import glob
import os
import shutil

# TensorFlow Keras
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D
from tensorflow.keras.models import Sequential

import sagemaker
from tqdm import tqdm
import math

## Prepare the Data and Upload to Amazon S3

The primary data source for a SageMaker training job is (nearly) always S3 - so we should upload our training and test data there.

For this exercise, we prepared a few lines of code below to help you upload the images to Amazon S3 using the [aws s3 cp](https://docs.aws.amazon.com/cli/latest/reference/s3/cp.html) CLI command.

But first, let's download the image data from the Repository of Open Data on AWS and sample a subset like we did in the [Local Notebook.ipynb](Local%20Notebook.ipynb).

**Check you understand** what data it's going to upload from this notebook, and where it's going to store it in S3, then start the upload running.


In [5]:
target_path = "/tmp/cifar10"
training_dir = f"{target_path}/train"
testing_dir = f"{target_path}/test"

# Download the CIFAR10 data from the Registry of Open Data on AWS
!rm -rf {target_path}
!mkdir -p {target_path}
!aws s3 cp s3://fast-ai-imageclas/cifar10.tgz {target_path} --no-sign-request

# Un-tar the CIFAR10 data, stripping the leading path element; this will leave us with directories
# {target_path}/testing/ and {target_path/training/
!tar zxf {target_path}/cifar10.tgz -C {target_path}/ --strip-components=1 --no-same-owner
!rm -f {target_path}/cifar10.tgz

# Get the list of files in the training and testing directories recursively
train_files = sorted(list(glob.iglob(os.path.join(training_dir, "*/*.png"), recursive=True)))
test_files = sorted(list(glob.iglob(os.path.join(testing_dir, "*/*.png"), recursive=True)))

print(f"Training files: {len(train_files)}")
print(f"Testing files:  {len(test_files)}")

random.shuffle(train_files)
random.shuffle(test_files)

labels = sorted(os.listdir(training_dir))

download: s3://fast-ai-imageclas/cifar10.tgz to ../../../tmp/cifar10/cifar10.tgz
Training files: 50000
Testing files:  10000


In [8]:
# prepare the local 'opt ml' directories so that we can run the training script.
local_opt_ml_dir = '/tmp/local_opt_ml'
local_input_train_channel_dir = f"{local_opt_ml_dir}/input/data/train"
local_input_test_channel_dir = f"{local_opt_ml_dir}/input/data/test"
local_model_dir = f"{local_opt_ml_dir}/model"
local_output_dir = f"{local_opt_ml_dir}/output"

!rm -rf {local_opt_ml_dir}
!mkdir -p {local_input_train_channel_dir}
!mkdir -p {local_input_test_channel_dir}
!mkdir -p {local_model_dir}
!mkdir -p {local_output_dir}

## Convert *.png to TFRecords files"

In [10]:
def _bytestring_feature(value):
    return tf.train.Feature(
        bytes_list=tf.train.BytesList(value=[tf.io.encode_png(value).numpy()])
    )

def _int_feature(list_of_ints): # int64
      return tf.train.Feature(int64_list=tf.train.Int64List(value=list_of_ints))

def _float_feature(list_of_floats): # float32
      return tf.train.Feature(float_list=tf.train.FloatList(value=list_of_floats))

def create_example(img_bytes, label):    
    feature = {
      "image": _bytestring_feature(img_bytes), # one image in the list
      "class": _int_feature([label]),        # one class in the list      
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))


In [22]:
# convert the *.png files to TFRecords
def convert_png_to_tfrecord(image_files, target_file):
    with tf.io.TFRecordWriter(target_file) as writer:
        for file in image_files:
            category = file.split('/')[-2] # 
            label = labels.index(category)
            # print(f"{label} - {file}")
            image = tf.io.decode_png(tf.io.read_file(file)) 
            example = create_example(image, label)
            writer.write(example.SerializeToString())

train_tfrec_filename = f'{local_input_train_channel_dir}/cifar10-train.tfrec'
convert_png_to_tfrecord(train_files, train_tfrec_filename)

test_tfrec_filename = f'{local_input_test_channel_dir}/cifar10-test.tfrec'
convert_png_to_tfrecord(test_files, test_tfrec_filename)


In [23]:
!ls -lah {local_input_train_channel_dir}

total 111M
drwxr-xr-x 2 root root   33 Aug  3 03:54 .
drwxr-xr-x 4 root root   31 Aug  3 03:26 ..
-rw-r--r-- 1 root root 111M Aug  3 04:02 cifar10-train.tfrec


In [24]:
!ls -lah {local_input_test_channel_dir}

total 23M
drwxr-xr-x 2 root root  32 Aug  3 04:02 .
drwxr-xr-x 4 root root  31 Aug  3 03:26 ..
-rw-r--r-- 1 root root 23M Aug  3 04:02 cifar10-test.tfrec


## Reading the data file and prepare for model training

In [35]:
# Configurable variables
AUTO = tf.data.experimental.AUTOTUNE # used in tf.data.Dataset API
BATCH_SIZE = 128
INPUT_SHAPE = (32, 32, 3)
EPOCHS = 5

In [26]:
training_filenames = tf.io.gfile.glob(f"{local_input_train_channel_dir}/*.tfrec")
validation_filenames = tf.io.gfile.glob(f"{local_input_test_channel_dir}/*.tfrec")

In [29]:
def read_tfrecord(example):
    features = {
        "image": tf.io.FixedLenFeature([], tf.string),  # tf.string = bytestring (not text string)
        "class": tf.io.FixedLenFeature([], tf.int64),   # shape [] means scalar
    }
    # decode the TFRecord
    example = tf.io.parse_single_example(example, features)
    
    image = tf.image.decode_png(example['image'], channels=3)
    image = tf.cast(image, tf.float32) / 255.0
    # image = tf.reshape(image, [32,32, 3])
    
    class_label = tf.cast(example['class'], tf.int32)
    
    return image, class_label
 

In [30]:

def get_batched_dataset(filenames):
    option_no_order = tf.data.Options()
    option_no_order.experimental_deterministic = False

    dataset = tf.data.Dataset.list_files(filenames)
    dataset = dataset.with_options(option_no_order)
    dataset = dataset.interleave(tf.data.TFRecordDataset, cycle_length=16, num_parallel_calls=AUTO)
    dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)

    dataset = dataset.shuffle(2048)
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True) # drop_remainder will be needed on TPU
    dataset = dataset.prefetch(AUTO) #

    return dataset
  
def get_training_dataset():
    return get_batched_dataset(training_filenames)

def get_validation_dataset():
    return get_batched_dataset(validation_filenames)


In [34]:
K.clear_session()

model = Sequential()

model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=INPUT_SHAPE))
model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(MaxPooling2D((2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(len(labels), activation='softmax'))

model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)

In [36]:
history = model.fit(
    get_training_dataset(), 
    epochs=EPOCHS,
    validation_data=get_validation_dataset(), 
    verbose=2
)

Epoch 1/5
Extension horovod.torch has not been built: /usr/local/lib/python3.8/site-packages/horovod/torch/mpi_lib/_mpi_lib.cpython-38-x86_64-linux-gnu.so not found
If this is not expected, reinstall Horovod with HOROVOD_WITH_PYTORCH=1 to debug the build error.
[2022-08-03 04:17:13.952 tensorflow-2-6-cpu-py3-ml-m5-large-b6c8fce23e41089ce72f77da84d4:1948 INFO utils.py:27] RULE_JOB_STOP_SIGNAL_FILENAME: None
[2022-08-03 04:17:14.007 tensorflow-2-6-cpu-py3-ml-m5-large-b6c8fce23e41089ce72f77da84d4:1948 INFO profiler_config_parser.py:111] Unable to find config at /opt/ml/input/config/profilerconfig.json. Profiler is disabled.
390/390 - 125s - loss: 1.7605 - accuracy: 0.3461 - val_loss: 1.3681 - val_accuracy: 0.4959
Epoch 2/5
390/390 - 123s - loss: 1.2879 - accuracy: 0.5377 - val_loss: 1.0565 - val_accuracy: 0.6188
Epoch 3/5
390/390 - 126s - loss: 1.0695 - accuracy: 0.6246 - val_loss: 0.9279 - val_accuracy: 0.6730
Epoch 4/5
390/390 - 128s - loss: 0.9397 - accuracy: 0.6713 - val_loss: 0.8354 

In [37]:
metrics = model.evaluate(get_validation_dataset()) 
metrics



[0.7641528844833374, 0.7303686141967773]

### SageMaker Section



In [39]:
import sagemaker
from sagemaker.tensorflow import TensorFlow

In [40]:
role = sagemaker.get_execution_role()
sess = sagemaker.Session()
bucket_name = sess.default_bucket()  # We'll just use the default bucket as the other examples did

#### To verify the model training script.

In [45]:
!python3 src/main_tfrec.py --train {local_input_train_channel_dir} --test {local_input_test_channel_dir} --output-data-dir {local_output_dir} --model-dir {local_model_dir} --epochs=1 --batch-size=128

2022-08-03 04:38:29.218651: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.
2022-08-03 04:38:29.218769: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:105] SageMaker Profiler is not enabled. The timeline writer thread will not be started, future recorded events will be dropped.
2022-08-03 04:38:29.245473: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.
Namespace(batch_size=128, epochs=1, model_dir='/tmp/local_opt_ml/model', output_data_dir='/tmp/local_opt_ml/output', test='/tmp/local_opt_ml/input/data/test', train='/tmp/local_opt_ml/input/data/train')
Loading dataset...
training dataset: ['/tmp/local_opt_ml/input/data/train/cifar10-train.tfrec']
2022-08-03 04:38:30.600142: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical 

In [41]:
s3_train_uri = f"s3://{bucket_name}/cifar10/tfrecords/train"
s3_test_uri = f"s3://{bucket_name}/cifar10/tfrecords/train"

In [42]:
# upload data to S3
!aws s3 sync --quiet --delete {local_input_train_channel_dir} {s3_train_uri}

!aws s3 sync --quiet --delete {local_input_test_channel_dir} {s3_test_uri}


In [44]:
# setup input channel for model training on SageMaker Training Job
input_mode = 'FastFile' # 'FastFile' is not suitable on the use case given the dataset is with many small files. (< 50MB)

train_channel = sagemaker.inputs.TrainingInput(s3_train_uri, input_mode=input_mode)
test_channel = sagemaker.inputs.TrainingInput(s3_test_uri, input_mode=input_mode)

inputs = { "train": train_channel, "test": test_channel }

In [53]:
estimator = TensorFlow(
    role=role,  # IAM role to run the job under - we just use the same as the notebook role
    
    # Framework setup:
    entry_point="main_tfrec.py",  # Target script
    source_dir="./src",  # Folder to bundle, in case we want to split the code between files
    framework_version="2.5",  # TensorFlow version
    py_version="py37",  # The time to migrate away from Python 2 has long ago passed!

    # Infrastructure provisioning:
    instance_count=1,  # We haven't implemented parallelization in our script
    instance_type="ml.m5.xlarge",  # Keras should be accelerated by GPU 'ml.g4dn.xlarge'
    max_run=20*60, # The training shouldn't take too long to run
    # use_spot_instances=True,  # May as well use spot to save money
    # max_wait=40*60,  # ...And we don't want to wait for ages for spot instances
    
    # Parameters to pass to our script:
    hyperparameters={
        "epochs": 5, 
        "batch-size": 256,
    },
    
    # Performance/progress metrics to scrape from console output:
    metric_definitions=[
        { "Name": "loss", "Regex": "loss: ([0-9\\.]+)" },
        { "Name": "accuracy", "Regex": "acc: ([0-9\\.]+)" },
        { "Name": "test:loss", "Regex": "Test.*loss=([0-9\\.]+)" },
        { "Name": "test:accuracy", "Regex": "Test.*accuracy=([0-9\\.]+)" },
    ],
    
    # Let's keep our SageMaker records tidy by giving the training jobs a sensible name
    base_job_name="cifar10-keras-tfrec",
)


#### To kick off SageMaker Training Job.

In [None]:
estimator.fit(inputs)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker:Creating training-job with name: cifar10-keras-tfrec-2022-08-03-04-44-01-816


2022-08-03 04:44:02 Starting - Starting the training job...
2022-08-03 04:44:26 Starting - Preparing the instances for trainingProfilerReport-1659501842: InProgress
......
2022-08-03 04:45:26 Downloading - Downloading input data...
2022-08-03 04:45:46 Training - Downloading the training image..