# MNIST Tensorflow training options

The **SageMaker Python SDK** helps you deploy your models for training and hosting in optimized, productions ready containers in SageMaker. The SageMaker Python SDK is easy to use, modular, extensible and compatible with TensorFlow and MXNet. This tutorial focuses on how to create a convolutional neural network model to train the [MNIST dataset](http://yann.lecun.com/exdb/mnist/) using **TensorFlow training**.



### Set up the environment

In [None]:
import os
import sagemaker
from sagemaker import get_execution_role
import project_path
from lib import utils

sagemaker_session = sagemaker.Session()

role = get_execution_role()

#bucket = '{{s3_workshop_bucket}}'
bucket = 'analytics-serverless-west'

### Download the MNIST dataset

In [None]:
from tensorflow.contrib.learn.python.learn.datasets import mnist
import tensorflow as tf

data_sets = mnist.read_data_sets('data', dtype=tf.uint8, reshape=False, validation_size=5000)

utils.convert_to(data_sets.train, 'train', 'data')
utils.convert_to(data_sets.validation, 'validation', 'data')
utils.convert_to(data_sets.test, 'test', 'data')

### Upload the data
We use the ```sagemaker.Session.upload_data``` function to upload our datasets to an S3 location. The return value inputs identifies the location -- we will use this later when we start the training job.

In [None]:
inputs = sagemaker_session.upload_data(bucket=bucket, path='data', key_prefix='data/DEMO-mnist')

# Construct a script for distributed training 
Here is the full code for the network model:

In [None]:
!cat '../scripts/mnist.py'

## Create a training job using the sagemaker.TensorFlow estimator 

The script here is and adaptation of the [TensorFlow MNIST example](https://github.com/tensorflow/models/tree/master/official/mnist). It provides a ```model_fn(features, labels, mode)```, which is used for training, evaluation and inference. 

## A regular ```model_fn```

A regular **```model_fn```** follows the pattern:
1. [defines a neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L96)
- [applies the ```features``` in the neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L178)
- [if the ```mode``` is ```PREDICT```, returns the output from the neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L186)
- [calculates the loss function comparing the output with the ```labels```](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L188)
- [creates an optimizer and minimizes the loss function to improve the neural network](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L193)
- [returns the output, optimizer and loss function](https://github.com/tensorflow/models/blob/master/official/mnist/mnist.py#L205)

## Writing a ```model_fn``` for distributed training
When distributed training happens, the same neural network will be sent to the multiple training instances. Each instance will predict a batch of the dataset, calculate loss and minimize the optimizer. One entire loop of this process is called **training step**.

### Syncronizing training steps
A [global step](https://www.tensorflow.org/api_docs/python/tf/train/global_step) is a global variable shared between the instances. It's necessary for distributed training, so the optimizer will keep track of the number of **training steps** between runs: 

```python
train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step())
```

That is the only required change for distributed training!

In [None]:
%%time
from sagemaker.tensorflow import TensorFlow

mnist_estimator = TensorFlow(entry_point='../scripts/mnist.py',
                             role=role,
                             framework_version='1.11.0',
                             training_steps=1000, 
                             evaluation_steps=100,
                             train_instance_count=2,
                             train_instance_type='ml.c4.xlarge')

mnist_estimator.fit(inputs)

The **```fit```** method will create a training job in two **ml.c4.xlarge** instances. The logs above will show the instances doing training, evaluation, and incrementing the number of **training steps**. 

In the end of the training, the training job will generate a saved model for TF serving.

## Training with SageMaker Pipe Mode and TensorFlow using the SageMaker Python SDK

SageMaker Pipe Mode is an input mechanism for SageMaker training containers based on Linux named pipes. SageMaker makes the data available to the training container using named pipes, which allows data to be downloaded from S3 to the container while training is running. For larger datasets, this dramatically improves the time to start training, as the data does not need to be first downloaded to the container. To learn more about pipe mode, please consult the AWS documentation at: https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo.html#your-algorithms-training-algo-running-container-trainingdata.

In this tutorial, we show you how to train a tf.estimator using data read with SageMaker Pipe Mode. We'll use the SageMaker `PipeModeDataset` class - a special TensorFlow `Dataset` built specifically to read from SageMaker Pipe Mode data. This `Dataset` is available in our TensorFlow containers for TensorFlow versions 1.7.0 and up. It's also open-sourced at https://github.com/aws/sagemaker-tensorflow-extensions and can be built into custom TensorFlow images for use in SageMaker. 

Although you can also build the PipeModeDataset into your own containers, in this tutorial we'll show how you can use the PipeModeDataset by launching training from the SageMaker Python SDK. The SageMaker Python SDK helps you deploy your models for training and hosting in optimized, production ready containers in SageMaker. The SageMaker Python SDK is easy to use, modular, extensible and compatible with TensorFlow and MXNet. 

Different collections of S3 files can be made available to the training container while it's running. These are referred to as "channels" in SageMaker. In this example, we use two channels - one for training data and one for evaluation data. Each channel is mapped to S3 files from different directories. The SageMaker PipeModeDataset knows how to read from the named pipes for each channel given just the channel name. When we launch SageMaker training we tell SageMaker what channels we have and where in S3 to read the data for each channel.


In [None]:
from sagemaker.session import Session

# Location to save your custom code in tar.gz format.
custom_code_upload_location = 's3://{}/customcode/tensorflow_pipemode'.format(bucket)

# Location where results of model training are saved.
model_artifacts_location = 's3://{}/sagemaker/artifacts'.format(bucket)

## Complete training source code 

In this section of the tutorial we train a TensorFlow LinearClassifier using pipe mode data. The TensorFlow training script is contained in following file:

In [None]:
!cat "../scripts/pipemode.py"

The above script implements all the functions required for a sagemaker tensorflow training script (See: [Preparing TensorFlow Training Script](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/tensorflow/README.rst#preparing-the-tensorflow-training-script)). 

### Using a PipeModeDataset in an input_fn
To train an estimator using a Pipe Mode channel, we must construct an input_fn that reads from the channel. To do this, we use the SageMaker PipeModeDataset. This is a TensorFlow Dataset specifically created to read from a SageMaker Pipe Mode channel. A PipeModeDataset is a fully-featured TensorFlow Dataset and can be used in exactly the same ways as a regular TensorFlow Dataset can be used.

The training and evaluation data used in this tutorial is synthetic. It contains a series of records stored in a TensorFlow Example protobuf object. Each record contains a numeric class label and an array of 1024 floating point numbers. Each array is sampled from a multi-dimensional Gaussian distribution with a class-specific mean. This means it is possible to learn a model using a TensorFlow Linear classifier which can classify examples well. Each record is separated using RecordIO encoding (though the PipeModeDataset class also supports the TFRecord format as well). 

The training and evaluation data were produced using the benchmarking source code in the sagemaker-tensorflow-extensions benchmarking sub-package. If you want to investigate this further, please visit the GitHub repository for sagemaker-tensorflow-extensions at https://github.com/aws/sagemaker-tensorflow-extensions. 

The following example code shows how to use a PipeModeDataset in an input_fn.

```python
from sagemaker_tensorflow import PipeModeDataset

def input_fn():
    # Simple example data - a labeled vector.
    features = {
        'data': tf.FixedLenFeature([], tf.string),
        'labels': tf.FixedLenFeature([], tf.int64),
    }
    
    # A function to parse record bytes to a labeled vector record
    def parse(record):
        parsed = tf.parse_single_example(record, features)
        return ({
            'data': tf.decode_raw(parsed['data'], tf.float64)
        }, parsed['labels'])

    # Construct a PipeModeDataset reading from a 'training' channel, using
    # the TF Record encoding.
    ds = PipeModeDataset(channel='training', record_format='TFRecord')

    # The PipeModeDataset is a TensorFlow Dataset and provides standard Dataset methods
    ds = ds.repeat(20)
    ds = ds.prefetch(10)
    ds = ds.map(parse, num_parallel_calls=10)
    ds = ds.batch(64)
    
    return ds
```

## Running training using the Python SDK

We can use the SDK to run our local training script on SageMaker infrastructure.

1. Pass the path to the pipemode.py file, which contains the functions for defining your estimator, to the sagemaker.TensorFlow init method.
2. Pass the S3 location that we uploaded our data to previously to the fit() method.

In [None]:
from sagemaker.tensorflow import TensorFlow

tensorflow = TensorFlow(entry_point='../scripts/pipemode.py',
                        role=role,
                        framework_version='1.11.0',
                        input_mode='Pipe',
                        output_path=model_artifacts_location,
                        code_location=custom_code_upload_location,
                        train_instance_count=1,
                        training_steps=1000,
                        evaluation_steps=100,
                        train_instance_type='ml.c4.xlarge')

After we've created the SageMaker Python SDK TensorFlow object, we can call fit to launch TensorFlow training:

In [None]:
%%time
import boto3

# use the region-specific sample data bucket
region = boto3.Session().region_name

# training data that is already partitioned
train_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/train'.format(region)
eval_data = 's3://sagemaker-sample-data-{}/tensorflow/pipe-mode/eval'.format(region)

tensorflow.fit({'train':train_data, 'eval':eval_data})

After ``fit`` returns, you've successfully trained a TensorFlow LinearClassifier using SageMaker pipe mode! The TensorFlow model data will be stored in '``s3://<bucket-name>/artifacts``' - where '``<bucket-name>``' is the name of the bucket you supplied earlier.

# Deploy the trained model to prepare for predictions

The deploy() method creates an endpoint which serves prediction requests in real-time.

In [None]:
mnist_predictor = mnist_estimator.deploy(initial_instance_count=1,
                                             instance_type='ml.m4.xlarge')

# Perform inference

Now that we've trained a model, we're going to use it to perform inference with a SageMaker endpoint as well as a batch transform job. The request handling behavior of the Endpoint deployed during the transform job is determined by the `mnist.py` script we looked at earlier.

## Invoking the endpoint

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (2,10)

def show_digit(arr):
    two_d = (np.reshape(arr, (28, 28)) * 255).astype(np.uint8)
    plt.imshow(two_d, interpolation='nearest')
    return plt

In [None]:
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)

for i in range(5):
    data = mnist.test.images[i].tolist()
    tensor_proto = tf.make_tensor_proto(values=np.asarray(data), shape=[1, len(data)], dtype=tf.float32)
    predict_response = mnist_predictor.predict(tensor_proto)
    
    print("========================================")
    label = np.argmax(mnist.test.labels[i])
    print("label is {}".format(label))
    show_digit(mnist.test.images[i]).show()
    prediction = predict_response['outputs']['classes']['int64_val'][0]
    print("prediction is {}".format(prediction))

### Evaluate
We can now use this predictor to classify hand-written digits. Drawing into the image box loads the pixel data into a `data` variable in this notebook, which we can then pass to the `predictor`.

In [None]:
from IPython.display import HTML
HTML(open("input.html").read())

In [None]:
%matplotlib inline
import numpy as np
import imageio
from PIL import Image


image = np.array([data], dtype=np.float32)
two_d = (np.reshape(image, (28, 28)) * 255).astype(np.uint8)
imageio.imwrite("../images/output.png", two_d)

In [None]:
image = Image.open("../images/output.png")
image = image.resize((28, 28), Image.ANTIALIAS)
image = image.convert('L')
image = np.asarray(image, dtype=np.float32)
image = np.reshape(image, (1, 28 * 28))
 
tensor_proto = tf.make_tensor_proto(image, shape=list(image.shape))
predict_response = mnist_predictor.predict(tensor_proto)
print("========================================")
prediction = predict_response['outputs']['classes']['int64_val'][0]
print("prediction is {}".format(prediction))

## SageMaker's transformer class

After training, we use our TensorFlow estimator object to create a `Transformer` by invoking the `transformer()` method. This method takes arguments for configuring our options with the batch transform job; these do not need to be the same values as the one we used for the training job. The method also creates a SageMaker Model to be used for the batch transform jobs.

The `Transformer` class is responsible for running batch transform jobs, which will deploy the trained model to an endpoint and send requests for performing inference.

In [None]:
output_path = 's3://{}/{}'.format(bucket, 'sagemaker/batch-transform')
transformer = mnist_estimator.transformer(instance_count=1, instance_type='ml.m4.xlarge', output_path=output_path)

## Run a batch transform job

For our batch transform job, we're going to use input data that contains 1000 MNIST images, located in the public SageMaker sample data S3 bucket. To create the batch transform job, we simply call `transform()` on our transformer with information about the input data.

In [None]:
input_bucket_name = 'sagemaker-sample-data-{}'.format(region)
input_file_path = 'batch-transform/mnist-1000-samples'

transformer.transform('s3://{}/{}'.format(input_bucket_name, input_file_path), content_type='text/csv')

Now we wait for the batch transform job to complete. We have a convenience method, `wait()`, that will block until the batch transform job has completed. We can call that here to see if the batch transform job is still running; the cell will finish running when the batch transform job has completed.

In [None]:
transformer.wait()

## Download the results

The batch transform job uploads its predictions to S3. Since we did not specify `output_path` when creating the Transformer, one was generated based on the batch transform job name:

In [None]:
print(transformer.output_path)

Now let's download the first ten results from S3:

In [None]:
import json
from six.moves.urllib import parse

import boto3

parsed_url = parse.urlparse(transformer.output_path)
bucket_name = parsed_url.netloc
prefix = parsed_url.path[1:]

s3 = boto3.resource('s3')

predictions = []
for i in range(10):
    file_key = '{}/data-{}.csv.out'.format(prefix, i)

    output_obj = s3.Object(bucket_name, file_key)
    output = output_obj.get()["Body"].read().decode('utf-8')

    predictions.extend(json.loads(output)['outputs']['classes']['int64Val'])

For demonstration purposes, we're also going to download the corresponding original input data so that we can see how the model did with its predictions.

In [None]:
import os

import matplotlib.pyplot as plt
from numpy import genfromtxt

plt.rcParams['figure.figsize'] = (2,10)

def show_digit(img, caption='', subplot=None):
    if subplot == None:
        _,(subplot) = plt.subplots(1,1)
    imgr = img.reshape((28,28))
    subplot.axis('off')
    subplot.imshow(imgr, cmap='gray')
    plt.title(caption)

tmp_dir = '/tmp/data'
if not os.path.exists(tmp_dir):
    os.makedirs(tmp_dir)

for i in range(10):
    input_file_name = 'data-{}.csv'.format(i)
    input_file_key = '{}/{}'.format(input_file_path, input_file_name)
    
    s3.Bucket(input_bucket_name).download_file(input_file_key, os.path.join(tmp_dir, input_file_name))
    input_data = genfromtxt(os.path.join(tmp_dir, input_file_name), delimiter=',')

    show_digit(input_data)

Here, we can see the original labels are:

```
7, 2, 1, 0, 4, 1, 4, 9, 5, 9
```

Now let's print out the predictions to compare:

In [None]:
print(', '.join(predictions))

## Serving the world
We already saw how to interact locally with the SageMaker endpoint, but what if you want to set something up and let others interact with the server, persistently?

## Python Serverless Microframework for AWS

[Chalice](https://github.com/awslabs/chalice) is a microframework for writing serverless apps in python. It allows you to quickly create and deploy applications that use AWS Lambda. It provides:

* A command line tool for creating, deploying, and managing your app
* A decorator based API for integrating with Amazon API Gateway, Amazon S3, Amazon SNS, Amazon SQS, and other AWS services.
* Automatic IAM policy generation

In [None]:
!pip install chalice

In [None]:
!chalice new-project mnist-hosting
!cd mnist-hosting
!mkdir .chalice

In [None]:
%%writefile requirements.txt

numpy
tensorflow

In [None]:
print(mnist_predictor.endpoint)

### Config required to deploy Chalice project with API Gateway endpoint

In [None]:
%%writefile .chalice/config.json
{
  "version": "2.0",
  "app_name": "predictor",
  "autogen_policy": false,
  "environment_variables": {
    "ENDPOINT_NAME": "sagemaker-tensorflow-2018-11-24-17-39-35-691"
  },
  "stages": {
    "dev": {
      "api_gateway_stage": "api"
    }
  }
}

### IAM Policy required for Lambda function to invoke the SageMaker inference endpoint

In [None]:
%%writefile .chalice/policy-dev.json

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "sagemaker:InvokeEndpoint",
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor2",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

### Lambd function used to take the PNG posted to the API and pass it to the inference endpoint

In [None]:
%%writefile app.py

from chalice import Chalice
from chalice import BadRequestError
import base64, os, boto3, ast
import numpy as np
import tensorflow as tf

app = Chalice(app_name='mnist-prediction')
app.debug=True

@app.route('/', methods=['POST'])
def index():
    body = app.current_request.json_body

    if 'data' not in body:
        raise BadRequestError('Missing image data')
    if 'ENDPOINT_NAME' not in os.environ:
        raise BadRequestError('Missing endpoint')

    image = base64.b64decode(body['data']) # byte array
    endpoint = os.environ['ENDPOINT_NAME'] 

    print("%s" % (endpoint))
    image = np.asarray(image, dtype=np.float32)
    image = np.reshape(image, (1, 28 * 28))

    tensor_proto = tf.make_tensor_proto(image, shape=list(image.shape))
    
    runtime = boto3.Session().client(service_name='sagemaker-runtime', region_name='us-west-2')
    response = runtime.invoke_endpoint(EndpointName=endpoint, ContentType='application/x-image', Body=tensor_proto)
#    probs = response['Body'].read().decode() # byte array
    
#    probs = ast.literal_eval(probs) # array of floats
#    probs = np.array(probs) # numpy array of floats
#    print(probs)
#    top_indexes = probs.argsort() # indexes in ascending order of probabilities
#    top_indexes = top_indexes[::-1][:1] # indexes for top k probabilities in descending order

#     top_categories = []
#     for i in top_indexes:
#        top_categories.append((i+1, probs[i]))

#     return {'response': str(top_categories)}
    return response

In [None]:
!chalice deploy

Make a call to the endpoint with an image

In [None]:
import boto3

file_name = 'floppy.jpg'
endpoint_name = 'DEMO-imageclassification-ep--2018-04-23-19-55-49'
runtime = boto3.Session().client(service_name='sagemaker-runtime',region_name='us-east-1')

with open(file_name, 'rb') as f:
        payload = f.read()
        payload = bytearray(payload)
response = runtime.invoke_endpoint(EndpointName=endpoint_name, ContentType='application/x-image', Body=payload)
print(response['Body'].read())

### Deleting the Chalice application

In [None]:
!chalice delete

# Deleting the endpoint

In [None]:
sagemaker.Session().delete_endpoint(mnist_predictor.endpoint)