# Machine Learning - Training and Serving models with Tensforflow

This post will step you through the process for training and deploying a machine learning model for inference using tensorflow. You will be stepped through building and training a machine learning model using tensorflows `Estimator` object and then deployment of the model with the [Tensorflow Serving API](https://www.tensorflow.org/api_docs/serving/).

Recent releases of Tensorflow saw the deprecation of some very useful experiment utilities such as `tf.contrib.learn.Experiment` and `tf.contrib.learn.learn_runner`. These utilities have now been replaced with the added function `tf.estimator.train_and_evaluate` to the `Estimator` module.

## tf.Estimator
The Estimator class is used for training and evaluation of tensorflow models. A model specified by a `model_fn` is wrapped in the `Estimator` object and returns the necessary operations for training, evaluation or prediction. You can read more about this class [here](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator).

## Step 1: Define your input function
The input function tells the `Estimator` class how to get its training and evaluation data. This function must return `features` and `labels`. The output of the `input_fn` forms the input to the `model_fn` described in step 2 below. We define a `generate_input_function` that defines the training and evaluation datasets. We make use of Tensorflows `Dataset` API which is used to represent an input pipeline as a collection of elements and allows you to apply transformations such as batching, shuffling and mapping functions over the dataset. In this example we use the `TFRecordDataset` and define a mapping function `parse_function`. For the purposes of demonstrating the use of `tfrecords`, we convert the MNIST dataset into tfrecords (shown in the script `mnist_tfrecord_creator.py`). The `parse_function` has a features `dict` mapping the feature keys and takes as input a single serialised `Example`. This function returns a `dict` mapping feature keys to `Tensor` and `SparseTensor` values and is applied to each `Example` in the input tfrecords. We then have optional shuffling, set the batch size and create an `Interator`. 

In [None]:
def generate_input_fn(
        filenames,
        num_epochs=None,
        shuffle=True,
        batch_size=32):
    def parse_function(example):
        features = {
            'image': tf.FixedLenFeature(
                shape=[784],
                dtype=tf.float32),
            'label': tf.FixedLenFeature(
                shape=[10],
                dtype=tf.float32),
            'id': tf.FixedLenFeature(
                shape=[1],
                dtype=tf.float32)
        }

        return tf.parse_single_example(example, features=features)
    
    dataset = tf.data.TFRecordDataset(filenames).map(parse_function)

    if shuffle:
        dataset = dataset.shuffle(buffer_size=batch_size * 10)
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    iterator = dataset.make_one_shot_iterator()
    features = iterator.get_next()
    
    return features, features.pop('label')

We define our training and evaluation input to the `Estimator` as follows:

In [None]:
train_input = lambda: model.generate_input_fn(
    args.train_files,
    num_epochs=hparams.num_epochs,
    batch_size=hparams.train_batch_size,
)
eval_input = lambda: model.generate_input_fn(
    args.eval_files,
    batch_size=hparams.eval_batch_size,
    shuffle=False
)

## Step 2: Define your model function
For the purposes of this post we have taken the Deep MNIST model defined in this [tutorial](https://www.tensorflow.org/versions/r1.1/get_started/mnist/pros) and adjusted it for use with the `Estimator` class. Source code for the original Deep MNIST model used in the aforementioned tutorial is found [here](https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/tutorials/mnist/mnist_deep.py).

The first step for converting the original Deep MNIST model is to remove all placeholders as using the `Estimator` class removes the need for a feed dictionary, and thus placeholders. The keep probability for dropout used in the Deep MNSIT now becomes a hyperparameter defined in `hparams`. The `hparams` argument contains all hyperparameters and this argument is only passed along by the `Estimator` and is not inspected. Therefore, the structure of `hparams` is entirely up to you. The modified `deepnn` function from the original Deep MNIST tutorial is defined in `model.py`. The `model_fn` is defined below. This has two required inputs, `features` and `labels`, and returns an `EstimatorSpec` instance. `features` and `labels` are returned from your `input_function` discussed above. Optionally, the `mode` can also be passed to your `model_fn` which specifies if the model is in training, evaluation or prediction. See [ModeKeys](https://www.tensorflow.org/api_docs/python/tf/estimator/ModeKeys) for further information on this parameter.

Depending on the mode, the required input arguments to the `EstimatorSpec` instance are different. See [here](https://www.tensorflow.org/api_docs/python/tf/estimator/EstimatorSpec) for more information. For training and evaluation specifically, we pass `loss`, `train_op` and `eval_metrics`. For prediction, we pass `export_outputs` which is used during serving and defines the output signatures to be exported to `SavedModel`.

In [None]:
def generate_model_fn(hparams):
    def _model_fn(mode, features, labels):
        image = features['image']
        y_conv = deepnn(image, hparams.keep_prob)
        prediction = tf.argmax(y_conv, 1)

        if mode in [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL]:
            loss = calculate_loss(labels, y_conv)
            extra_update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
            with tf.control_dependencies(extra_update_ops):
                train_op = tf.train.AdamOptimizer(hparams.learning_rate).minimize(
                        loss,
                        global_step=tf.train.get_global_step())
            
            eval_metrics = get_eval_metrics(labels, y_conv)
            
            # Add tensorboard summaries
            tf.summary.scalar('loss', loss)
            tf.summary.image('input', tf.reshape(image, [-1, 28, 28, 1]))
            export_outputs = None
        else:
            loss = None
            train_op = None
            eval_metrics = None
            export_outputs = {
                'labels': tf.estimator.export.PredictOutput({
                    'label': prediction,
                    'id': features['id'],
                })}

        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            train_op=train_op,
            eval_metric_ops=eval_metrics,
            predictions={
                'label': tf.nn.softmax(tf.cast(prediction,tf.float32))},
            export_outputs=export_outputs
            )
    return _model_fn

If you wish to train your model across a multiple GPU's, a small change must be made to the optimization process used in the model function to facilitate computing and syncronising the correct gradients and associated weight updates on each card. Tensorflow provides the wrapper function `tf.contrib.estimator.TowerEstimator`, which can be used to transform a standard optimizor into one does this for us. If you don't have access to multiple GPU's, a single GPU or CPU is perfectly sufficient for this tutorial.

In [None]:
optimizer = tf.train.AdamOptimizer(hparams.learning_rate)
tower_optimizer = tf.contrib.estimator.TowerOptimizer(optimizer)
train_op = tower_optimiser.minimize(loss, global_step=tf.train.get_global_step())

Don't worry too much about what this is doing under the hood just yet, we will explain that in a bit more detail once we get to training time!

I should also point out that multi GPU training is most likely a little overkill for something as "simple" as MNIST. Whilst your training time will increase significantly, distributed training is something that will become a lot more useful when working with more complex problems and much larger networks. 

## Step 3: Define your serving function
For the purposes of serving, we define a `serving_input_receiver_function`. This function expects a serialized `Example` and parses this according to the provided `feature_spec`. This function is also used for defining our model exports. Our serving input function is defined as:

In [None]:
def example_serving_input_fn():
    features = {
        'image': tf.FixedLenFeature(
            shape=[784],
            dtype=tf.float32),
        'label': tf.FixedLenFeature(
            shape=[10],
            dtype=tf.float32),
        'id': tf.FixedLenFeature(
            shape=[1],
            dtype=tf.float32),
    }
    return tf.estimator.export.build_parsing_serving_input_receiver_fn(features)()

## Step 4: Set up your experiment
In this section we describe how to setup your experiment using the above defined functions and how to create and use the `Estimator` class.
Previously we mention `hparams`. `hparams` is defined by our input arguments to our experiment. For convenience we define `task.py` which defines all the input arguments to our model (including hyperparameters, training and evaluation files, output directory, etc.) and sets up our experiment. In here we map the input arguments for our experiment to the `Hparams` class and this object holds the input arguments as name-value pairs.

#### Training and Evaluation Input Functions
The `Estimator` requires us to define an `eval_spec` and `train_spec` instance. We define these below. The `EvalSpec` class requires the `eval_input` function defined above. In addition, we pass in a couple of optional arguments including:
1. steps: the number is evaluation steps (default is 100)
1. exporters: Iterable of `exporters` or a single one (default is None)
1. throttle_secs: Time to wait between evaluations (default is 600)

`exporter`s define the type of model export. For this example we use the `FinalExporter` class which performs a single model export at the end of training. There are other exporters available allowing you to set the frequency of exports and how many exports to keep.
`throttle_secs` define the time between evaluations and evaluate the model at the lastest checkpoint. Note that if there are no new checkpoints, evaluation will not be performed. The default is 600 seconds but due to the short time for training completion for the MNIST model we set the time to 60 seconds to obtain regular evaluations. Unfortunately, the `Estimator` class does not allow you to set the frequency of evaluation according the the number of training steps completed. There is a current feature request to add this functionality [here](https://github.com/tensorflow/tensorflow/issues/17650#issuecomment-385097233) if you would to support it.

In [None]:
exporters = []
exporters.append(tf.estimator.FinalExporter(
    'mnist', 
    model.example_serving_input_fn))
eval_spec = tf.estimator.EvalSpec(
        eval_input, 
        steps=hparams.eval_steps, 
        exporters=exporters,
        throttle_secs=60)

train_spec = tf.estimator.TrainSpec(
        train_input,
        max_steps=hparams.max_steps)

#### Create Estimator
Next we define our `Estimator` object that takes as input the `model_fn` described above. In addition, we pass in an optional parameter `config` to demonstrate modifying the default configuration parameters for the `Estimator`. For this object we only define the `model_dir` but other parameters that can be modified include:
1. frequency in steps to save summaries
1. number of checkpoints to keep

For the full list please see [RunConfig](https://www.tensorflow.org/api_docs/python/tf/estimator/RunConfig).

Our`Estimator` is defined as follows:

In [None]:
estimator = tf.estimator.Estimator(
        model.generate_model_fn(hparams),
        config=tf.estimator.RunConfig(
            model_dir=hparams.job_dir,
        )
)

If you are intending to train and evaluate your model on multiple GPU's (the model_fn from Step 2 must have been modified appropriately), another convenient wrapper function is provided by Tensorflow to help with this. `tf.contrib.estimator.replicate_model_fn` takes the model function returned by `generate_model_fn` and replicates the model across available GPU's. Replication pins identical copies of the model and its associated ops on each card (refered to generally as a Tower), whilst variables are pinned to the CPU and sharded and distributed to each Tower during the forward pass of the graph. On the backwards pass, losses are computed seperately on each Tower, and then aggregated on the CPU. The aggregated loss is then used by the `TowerOptimizer` we implemented in our model function to compute the gradients and update weights for each Tower, respectively. It can be helpful to analyse the visual graph of your model to fully understand this distributed backpropagation loop. 

In [None]:
estimator = tf.estimator.Estimator(
        tf.contrib.estimator.replicate_model_fn(model.generate_model_fn(hparams)),
        config=tf.estimator.RunConfig(
            model_dir=hparams.job_dir,
        )
)

#### Training and Evaluation
To perform training and evaluation we use the utility function `tf.estimator.train_and_evaluate`. We simply pass in our `estimator`, `train_spec` and `eval_spec` as shown below. This function performs training and evaluation to the given specifications defined above. 

In [None]:
tf.estimator.train_and_evaluate(
    estimator, 
    train_spec, 
    eval_spec)

## Step 5: Set up your model for serving
Once your model has finished training and produced a `SavedModel` export, you can now set up your model for serving.
#### Check your saved model exports
This next step is more of a sanity check to confirm the input and output tensors for our model and the signature name. We defined these earlier in our `export_outputs` in the `EstimatorSpec` and in the `example_seving_input_function`. These definitions are needed in order to query our servable. Tensorflow provides a [SavedModel CLI](https://www.tensorflow.org/versions/r1.2/programmers_guide/saved_model_cli) for inspecting `SavedModels`. To show all available information we run the following command (note `job-dir` was defined previously for our `Estimator`. This can be a google cloud bucket or a local directory):

`saved_model_cli show --dir <job-dir>/logs/Experiment#/export/mnist/<timestamp> --all`

The ouput from the above is:

```
MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['labels']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['examples'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: input_example_tensor:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['id'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: ParseExample/ParseExample:0
    outputs['label'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: ArgMax:0
  Method name is: tensorflow/serving/predict

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['examples'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: input_example_tensor:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['id'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: ParseExample/ParseExample:0
    outputs['label'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: ArgMax:0
  Method name is: tensorflow/serving/predict

```

#### Build your docker image

We will run our servable inside a docker container. For Docker installation instructions and more information please visit: https://docs.docker.com/engine/installation/.

We start by defining the required dependenies and package installation instructions in a Dockerfile. The dependencies for tensorflow serving are listed here https://github.com/tensorflow/serving/blob/master/tensorflow_serving/g3doc/setup.md. Please refer to the Dockerfile [hereURLPLEASE]().
I will draw your attention to the last line in the Dockerfile:
```
CMD /usr/bin/tensorflow_model_server --port=<container-port> --model_name=<model_name> --model_base_path=/exports
```
This is the line of code that runs the servable inside the docker container. The `model_name` you will need to remember for the grpc client we will setup later to query the servable. The `--port` argument is the port exposed in the container.

Next we build our docker image with the following:

`docker build -t <image_name> .`

To run the servable in the container we need to copy the model exports into the container or mount them when we run it. Tensorflow serving supports versioning and we need to make sure the model exports are saved in numbered directories. Your directory structure should look something like this (assuming Version 1):
```
exports/1/variables/...
exprots/1/saved_model.pb file
```
To run the docker container and mount the model exports we run the following command:

`docker run -it -p <external-port>:<container-port> -v <model-exports-directory>:/exports -e MODEL_NAME=<model_name> --name <container-name> <image-name>`

Note that `/exports` is the `model_base_path` that we defined in our Dockerfile. The above command runs the container in the foreground. If you wish to run in the backgrund replace `-it` with `-d`. If running in the background, you can view the logs with:
`docker logs <container-name>`

## Step 6: Set up gRPC client

Tensorflow serving uses GRPC protocol and we need to create a client to issue inference requests. First, we need to install python GRPC libraries:
```
pip install grpcio grpcio-tools 
```
The noteworthy parts of creating your client are as follows:

1. Firstly, we create our prediction request object:

    ```
    request = predict_pb2.PredictRequest() 
    ```
2. Secondly, we initialise the prediction request object with the details of our model. The details required are:
    * Model name
    * Signature name

    The model name we defined when we started our model server. The signature name is as defined in our `export_outputs` from step 2. Alternatively we can use the `DEFAULT_SERVING_SIGNATURE_DEF_KEY` defined [here](https://www.tensorflow.org/api_docs/python/tf/saved_model/signature_constants).

    ```
     request.model_spec.name = model_name
     request.model_spec.signature_name = signature_name
    ```
3. Lastly we define our input data for inference and issue the request.
    The input data for inference needs to be a `TensorProto`. We get this by first creating `tf.train.Example`s and serialising these to a string. Similar to what we did in previous steps we define our features and create examples from the input data:

In [None]:
features = {
   "image":
   tf.train.Feature(
       float_list=tf.train.FloatList(value=image.flatten().tolist())),
   "label":
   tf.train.Feature(
       float_list=tf.train.FloatList(value=label.flatten().tolist())),
   "id":
   tf.train.Feature(
       float_list=tf.train.FloatList(value=[_id])),
}
features = tf.train.Features(feature=features)
example = tf.train.Example(features=features)
example_serialized = example.SerializeToString()

These serialised examples become the input to our request:
```
request.inputs['examples'].CopyFrom(tf.contrib.util.make_tensor_proto([example_serialized], dtype=tf.string))
```
We call prediction on our servable with:
```
result = self.stub.Predict(request, timeout) 
```
For convenience we create a ServableClient class in `servable_client.py` which implements the above.

Following is a complete example of creating the client, defining the input data, querying the servable and viewing the results. The below can also be found in `mnist_query.py`.

In [1]:
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
from servable_client import ServableClient, create_features
import numpy as np

  from ._conv import register_converters as _register_converters


In [2]:
mnist = input_data.read_data_sets("data/", one_hot=True)
val = mnist.validation.next_batch(10)
features = []
i = 0
for img, label in zip(val[0], val[1]):
    features.append(create_features(img, label, i))
    i=+1

Extracting data/train-images-idx3-ubyte.gz
Extracting data/train-labels-idx1-ubyte.gz
Extracting data/t10k-images-idx3-ubyte.gz
Extracting data/t10k-labels-idx1-ubyte.gz


In [3]:
servable_client = ServableClient("10.10.0.10", "mnist-serving", port=9001)
results = []
for feature in features:
    result = servable_client.do_inference(feature)
    label = result.outputs['label'].int64_val[0]
    results.append(label)

In [4]:
for label, pred in zip(val[1], results):
    print('Predicted: %d' % pred, 'Label: %d' % np.argmax(label))

('Predicted: 9', 'Label: 9')
('Predicted: 8', 'Label: 8')
('Predicted: 8', 'Label: 8')
('Predicted: 0', 'Label: 0')
('Predicted: 1', 'Label: 1')
('Predicted: 8', 'Label: 8')
('Predicted: 1', 'Label: 1')
('Predicted: 6', 'Label: 6')
('Predicted: 5', 'Label: 5')
('Predicted: 0', 'Label: 0')
