<h2> PipeMode using Tensorflow Estimators

PipeMode is a new feature for training large scale datasets in Sagemaker. Without having to load the data into the local machine, Sagemaker now allows training a model directly using data staged in S3. This is perfectly suited for data that doesn't fit in memory, and is built on top of Tensorflow's data.dataset API.

The Legacy PipeMode will soon be deprecated, as there is a new script mode option which gives customers more flexibility to run their python scripts in pipe mode and build and bring their own models into pipe mode. This sample notebook and accompanying script file shows how to run Tensorflow Estimators with PipeMode in script mode.

Sample data comes from MNIST

In [1]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.session import Session

# S3 bucket for saving code and model artifacts.
# Feel free to specify a different bucket here if you wish.
sagemaker_session = sagemaker.Session()
import sagemaker

bucket = sagemaker.Session().default_bucket() # we are using a default bucket here but you can change it to any bucket in your account
prefix = 'sagemaker/mnist-pipemode' # you can customize the prefix (subfolder) here

role = sagemaker.get_execution_role() # we are using the notebook instance role for training in this example

In [4]:
custom_code_upload_location = 's3://{}/{}/customcode/tensorflow_pipemode'.format(bucket, prefix)
print(custom_code_upload_location)

s3://sagemaker-us-east-2-389535300735/sagemaker/mnist-pipemode/customcode/tensorflow_pipemode


In [2]:
import boto3
from sagemaker import tensorflow
import utils_sharded
from tensorflow.contrib.learn.python.learn.datasets import mnist
import tensorflow as tf

In [8]:
data_sets = mnist.read_data_sets('data', dtype=tf.uint8, reshape=False, validation_size=5000, one_hot=False)

Instructions for updating:
Please use alternatives such as official/mnist/dataset.py from tensorflow/models.
Instructions for updating:
Please write your own downloading logic.
Instructions for updating:
Please use tf.data to implement this functionality.
Extracting data/train-images-idx3-ubyte.gz
Instructions for updating:
Please use tf.data to implement this functionality.
Extracting data/train-labels-idx1-ubyte.gz
Extracting data/t10k-images-idx3-ubyte.gz
Extracting data/t10k-labels-idx1-ubyte.gz
Instructions for updating:
Please use alternatives such as official/mnist/dataset.py from tensorflow/models.


In [9]:
utils_sharded.process_data(data_sets.train, 'train', 'data/train', num_shards=5)
utils_sharded.process_data(data_sets.validation, 'validation', 'data/val',num_shards=5)
utils_sharded.process_data(data_sets.test, 'test', 'data/test',num_shards=5)

Writing data/train/train-1.tfrecords
Writing data/train/train-2.tfrecords
Writing data/train/train-3.tfrecords
Writing data/train/train-4.tfrecords
Writing data/train/train-5.tfrecords
Writing data/val/validation-1.tfrecords
Writing data/val/validation-2.tfrecords
Writing data/val/validation-3.tfrecords
Writing data/val/validation-4.tfrecords
Writing data/val/validation-5.tfrecords
Writing data/test/test-1.tfrecords
Writing data/test/test-2.tfrecords
Writing data/test/test-3.tfrecords
Writing data/test/test-4.tfrecords
Writing data/test/test-5.tfrecords


In [10]:
inputs = sagemaker.Session().upload_data(path='data', bucket=bucket, key_prefix=prefix+'/data/mnist')
print(inputs)

s3://sagemaker-us-east-2-389535300735/sagemaker/mnist-pipemode/data/mnist


In [7]:
!pygmentize pipemode_MNIST.py

[37m#!/usr/bin/env python3[39;49;00m
[37m# -*- coding: utf-8 -*-[39;49;00m
[33m"""[39;49;00m
[33mCreated on Tue Jun  4 12:30:50 2019[39;49;00m
[33m[39;49;00m
[33m@author: stenatu[39;49;00m
[33m"""[39;49;00m

[37m#!/usr/bin/env python3[39;49;00m
[37m# -*- coding: utf-8 -*-[39;49;00m
[33m"""[39;49;00m
[33mCreated on Thu May 30 14:31:44 2019[39;49;00m
[33m[39;49;00m
[33m@author: stenatu[39;49;00m
[33m"""[39;49;00m

[37m#!/usr/bin/env python3[39;49;00m
[37m# -*- coding: utf-8 -*-[39;49;00m
[33m"""[39;49;00m
[33mCreated on Tue May 28 09:19:09 2019[39;49;00m
[33m[39;49;00m
[33m@author: stenatu[39;49;00m
[33m[39;49;00m
[33mIn this code I show how to modify PipeMode dataset to work with script mode,[39;49;00m
[33msince the py2 version of PipeMode (and all the notebook examples in the documentation)[39;49;00m
[33mwill be deprecated shortly.[39;49;00m
[33m[39;49;00m
[33m"""[39;49;00m
[34mimport[39;49;00m [04m[36m

In [11]:
from sagemaker.tensorflow import TensorFlow

#NUM_EPOCHS   = 5
BATCH_SIZE   = 30
#INPUT_MODE   = 'Pipe' # Can try it with 'File' mode as well
#num_train_samples = 500 # replace with num data samples
#num_val_samples = 500 
#num_test_samples= 500
STEPS = 1000

hyperparameters = {'batch_size': BATCH_SIZE,
                   'steps': STEPS,
                    'model_dir': custom_code_upload_location}

tensorflow = TensorFlow(entry_point='pipemode_MNIST.py',
                        role = role,
                        output_path = custom_code_upload_location,
                        hyperparameters = hyperparameters,
                        framework_version='1.12.0',
                        py_version = 'py3',
                        input_mode='Pipe',
                        train_instance_count=1,
                        train_instance_type='ml.c4.xlarge',
                       script_mode = True)

In [12]:
%%time
import boto3


# use the region-specific sample data bucket
remote_inputs = {'train' : inputs + '/train', 
                 'eval'   : inputs +'/val', 
                 'test'  : inputs +'/test'}
tensorflow.fit(remote_inputs, wait=True)


2019-06-04 21:52:38 Starting - Starting the training job...
2019-06-04 21:52:41 Starting - Launching requested ML instances......
2019-06-04 21:53:43 Starting - Preparing the instances for training...
2019-06-04 21:54:39 Downloading - Downloading input data
2019-06-04 21:54:39 Training - Training image download completed. Training in progress...
[31m2019-06-04 21:54:42,886 sagemaker-containers INFO     Imported framework sagemaker_tensorflow_container.training[0m
[31m2019-06-04 21:54:42,893 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[31m2019-06-04 21:54:43,201 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[31m2019-06-04 21:54:43,217 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[31m2019-06-04 21:54:43,229 sagemaker-containers INFO     Invoking user script
[0m
[31mTraining Env:
[0m
[31m{
    "additional_framework_parameters": {},
    "channel_input_dirs": {
        "eval

Notice that although trainig is completed, script mode does not automatically save model files to opt/ml/model for tensorflow estimators. In order to get it to work you have to do 2 things --
modify model_dir in your estimator and a .export_saved_model command with model_dir specified to the default value (opt/ml/model)

If you follow these steps, then deploy should work in estimator mode as shown below. Also your model tar file should be in the custom location where you want it to be saved.

In [13]:
%time
predictor = tensorflow.deploy(initial_instance_count=1,
                             instance_type='ml.c4.xlarge')

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.72 µs
---------------------------------------------------------------------------------------!

Make a prediction with an unseen test dataset.

In [26]:
import numpy as np
total_to_test = 100 # or to use the whole test suite, set this to: len(X_test)
num_accurate  = 0
HEIGHT = 28
WIDTH = 28

for i in range(total_to_test):
    result = predictor.predict(np.reshape(data_sets.test.images[i], [-1, HEIGHT*WIDTH]))
    predicted_prob = result['predictions'][0][0]
    predicted_label = round(predicted_prob)
    if y_test[i] == predicted_label:
        num_accurate += 1
        print('PASS. Actual: {:.0f}, Prob: {:.4f}'.format(y_test[i], predicted_prob))
    else:
        print('FAIL. Actual: {:.0f}, Prob: {:.4f}'.format(y_test[i], predicted_prob))
print('Acc: {:.2%}'.format(num_accurate/total_to_test))

ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received client error (400) from model with message "{ "error": "Serving signature name: \"serving_default\" not found in signature def" }". See https://us-east-2.console.aws.amazon.com/cloudwatch/home?region=us-east-2#logEventViewer:group=/aws/sagemaker/Endpoints/sagemaker-tensorflow-serving-2019-06-04-22-22-25-787 in account 389535300735 for more information.

Delete endpoint when done

In [29]:
sagemaker_session.delete_endpoint(predictor.endpoint)

Remove all data from S3 when done

In [None]:
shutil.rmtree('data', ignore_errors=True)
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(bucket)
resp = s3_bucket.objects.filter(Prefix=data_prefix + '/').delete()