# Training and Deploying TensorFlow Models at Scale

In [1]:
# FIXME: meke autocompletion working again
%config Completer.use_jedi = False

import os

# OpenAI gym
import gym

%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

# Get smooth animations
mpl.rc('animation', html='jshtml')

physical_devices = tf.config.list_physical_devices('GPU')

if not physical_devices:
    print("No GPU was detected.")
else:
    # https://stackoverflow.com/a/60699372
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
    
from tensorflow import keras

No GPU was detected.


## Deploying TensorFlow models to TensorFlow Serving
*TensorFlow Serving (TFS)* provides simple REST and gRPC APIs and handle model versioning and graceful updates and rollbacks (blue-green or stop-the-world).

*Note: It's generally good idea to include and compile preprocessing logic in the model so that clients can send raw data and not duplicate these preprocessing steps. This, however, implies that all the preprocessing etc. must be done with TF-only functions. Otherwise it won't be compiled and saved in the computation graph.*

In [2]:
# Fetch MNIST dataset
(X_train_full, y_train_full), (X_test, y_test) = keras.datasets.mnist.load_data()

# Scale the data
max_value = 255.
X_train_full = X_train_full[..., np.newaxis].astype(np.float32) / max_value
X_test = X_test[..., np.newaxis].astype(np.float32) / max_value

# Split the raw training data to training and validation sets
valid_split = 5000
X_valid, X_train = X_train_full[:valid_split], X_train_full[valid_split:]
y_valid, y_train = y_train_full[:valid_split], y_train_full[valid_split:]

# Use first couple of test instances for predictions
X_new = X_test[:3]

### Save/Load a SavedModel

In [3]:
# Set RNG state
np.random.seed(42)
tf.random.set_seed(42)

# Build model v1
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(100, activation="relu"),
    keras.layers.Dense(10, activation="softmax"),
])

model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=keras.optimizers.SGD(lr=1e-2),
    metrics=["accuracy"],
)

# Train the model
model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7f87a5434400>

In [4]:
# Test model's predictions
np.round(model.predict(X_new), 2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [5]:
model_version = "0001"
model_name = "my_mnist_model"
model_path = os.path.join("data", model_name, model_version)
model_path

'data/my_mnist_model/0001'

In [6]:
!rm -rf data/{model_name}

In [7]:
# Save the model in `SavedModel` format/structure
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: data/my_mnist_model/0001/assets


Let's print the structure of `SavedModel`:
* `saved_model.pb` is a protobuf-serialized computaiton graph of the model
* `variables/` contains all the weights, possibly split into multiple files
* `assets/` contains additional data such as examples, dictionary tables, etc.

In [8]:
!tree data/{model_name}

[01;34mdata/my_mnist_model[00m
└── [01;34m0001[00m
    ├── [01;34massets[00m
    ├── saved_model.pb
    └── [01;34mvariables[00m
        ├── variables.data-00000-of-00001
        └── variables.index

3 directories, 3 files


The `saved_model_cli` tool can be handy to describe the saved model as well as running predictions (for debugging).

In [9]:
!saved_model_cli show --dir {model_path}

2021-03-05 08:32:36.977697: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-03-05 08:32:36.977722: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
The given SavedModel contains the following tag-sets:
'serve'


In [10]:
!saved_model_cli show --dir {model_path} --tag_set serve

2021-03-05 08:32:39.920243: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-03-05 08:32:39.920270: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
The given SavedModel MetaGraphDef contains SignatureDefs with the following keys:
SignatureDef key: "__saved_model_init_op"
SignatureDef key: "serving_default"


In [11]:
!saved_model_cli show --dir {model_path} --tag_set serve --signature_def serving_default

2021-03-05 08:32:42.201065: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-03-05 08:32:42.201091: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
The given SavedModel SignatureDef contains the following input(s):
  inputs['flatten_input'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 28, 28, 1)
      name: serving_default_flatten_input:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['dense_1'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 10)
      name: StatefulPartitionedCall:0
Method name is: tensorflow/serving/predict


In [12]:
!saved_model_cli show --dir {model_path} --all

2021-03-05 08:32:45.003892: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-03-05 08:32:45.003918: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.

MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['__saved_model_init_op']:
  The given SavedModel SignatureDef contains the following input(s):
  The given SavedModel SignatureDef contains the following output(s):
    outputs['__saved_model_init_op'] tensor_info:
        dtype: DT_INVALID
        shape: unknown_rank
        name: NoOp
  Method name is: 

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['flatten_input'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 28, 28, 1)
        name: se

In [13]:
# Save the input instances to a numpy file
inputs_path = os.path.join("data", "my_mnist_tests.npy")
np.save(inputs_path, X_new)

In [14]:
# Get the name of the input layer in the model
input_name = model.input_names[0]
input_name

'flatten_input'

Use the CLI to make predictions on these test instances.

In [15]:
!saved_model_cli run --dir {model_path} --tag_set serve \
                     --signature_def serving_default    \
                     --inputs {input_name}={inputs_path}

2021-03-05 08:32:47.632133: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-03-05 08:32:47.632163: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2021-03-05 08:32:49.435242: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-03-05 08:32:49.435398: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2021-03-05 08:32:49.435411: W tensorflow/stream_executor/cuda/cuda_driver.cc:326] failed call to cuInit: UNKNOWN ERROR (303)
2021-03-05 08:32:49.435429: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running 

In [16]:
np.round(
    [[1.1347984e-04, 1.5187356e-07, 9.7032893e-04, 2.7640699e-03, 3.7826971e-06, 7.6876910e-05, 3.9140293e-08, 9.9559116e-01, 5.3502394e-05, 4.2665208e-04],
    [8.2443521e-04, 3.5493889e-05, 9.8826385e-01, 7.0466995e-03, 1.2957400e-07, 2.3389691e-04, 2.5639210e-03, 9.5886099e-10, 1.0314899e-03, 8.7952529e-08],
    [4.4693781e-05, 9.7028232e-01, 9.0526715e-03, 2.2641101e-03, 4.8766597e-04, 2.8800720e-03, 2.2714981e-03, 8.3753867e-03, 4.0439744e-03, 2.9759688e-04]],
    2,
)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

### TensorFlow Serving
Serving the model with TF Serving is as simple as running following docker container
```bash
docker run \
   -it \
   --rm \
   -p 8500:8500 \
   -p 8501:8501 \
   -v "$PWD/data/my_mnist_model:/models/my_mnist_model" \
   -e MODEL_NAME=my_mnist_model \
   tensorflow/serving
```
Let's note that
* Port 8500 will expose the gRPC API
* Port 8501 will expose the REST API
* We mount the SavedModel directory to `/models` which is the default location TF Serving looks at

Some features of TF Serving:
* Automatic detection of new models and their versions
* Graceful model updates
* Simple rollback to previous version (deleting version directory)
* Batching incomming requests for better GPU utilization on larger input batches and followup redistribution of predictions to appropriate clients

#### REST API

In [17]:
import json

# Build a request JSON
input_data_json = json.dumps({
    "signature_name": "serving_default",
    "instances": X_new.tolist(),
})

# Print first portion of the JSON data
repr(input_data_json)[:1500] + "..."

'\'{"signature_name": "serving_default", "instances": [[[[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0]], [[0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0

In [18]:
import requests

# Send a POST request with the input JSON payload
response = requests.post(
    f"http://localhost:8501/v1/models/{model_name}:predict",
    data=input_data_json,
)

# Raise an exception in case of error
response.raise_for_status()

# Parse the response JSON
response = response.json()

# Show the response fields
response.keys()

dict_keys(['predictions'])

In [19]:
# Convert predictions to a numpy array and display them
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

#### gRPC API

In [22]:
import grpc

from tensorflow_serving.apis import prediction_service_pb2_grpc
from tensorflow_serving.apis.predict_pb2 import PredictRequest

# Build the prediction protobuf request
request = PredictRequest()
request.model_spec.name = model_name
request.model_spec.signature_name = "serving_default"
request.inputs[input_name].CopyFrom(tf.make_tensor_proto(X_new))

# Create a gRPC channel and service stub
channel = grpc.insecure_channel('localhost:8500')
predict_service = prediction_service_pb2_grpc.PredictionServiceStub(channel)

# Request the predictions and show the response
response = predict_service.Predict(request, timeout=10.0)
response

outputs {
  key: "dense_1"
  value {
    dtype: DT_FLOAT
    tensor_shape {
      dim {
        size: 3
      }
      dim {
        size: 10
      }
    }
    float_val: 0.00011432634346419945
    float_val: 1.5143777432058414e-07
    float_val: 0.0009806256275624037
    float_val: 0.002772804582491517
    float_val: 3.755267925953376e-06
    float_val: 7.634064240846783e-05
    float_val: 3.913793023002654e-08
    float_val: 0.995567262172699
    float_val: 5.354719905881211e-05
    float_val: 0.0004309909709263593
    float_val: 0.0008170322980731726
    float_val: 3.541000114637427e-05
    float_val: 0.9882665276527405
    float_val: 0.0070426976308226585
    float_val: 1.2937582027916505e-07
    float_val: 0.00023344451619777828
    float_val: 0.0025721604470163584
    float_val: 9.64404556214049e-10
    float_val: 0.0010325429029762745
    float_val: 8.799757011956899e-08
    float_val: 4.441077544470318e-05
    float_val: 0.9703000783920288
    float_val: 0.009060459211468697
   

In [23]:
# Get the name of the output layer from the model
output_name = model.output_names[0]

# Get the protobuf outputs from the response
outputs_proto = response.outputs[output_name]

# Parse and display the predictions
y_proba = tf.make_ndarray(outputs_proto)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]],
      dtype=float32)

In [24]:
# Alternatively one can parse the predictions directly to numpy (if TF is not available)
shape = [dim.size for dim in outputs_proto.tensor_shape.dim]
y_proba = np.array(outputs_proto.float_val).reshape(shape)
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.97, 0.01, 0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  ]])

### Deploying a new model version

In [25]:
# Set RNG state
np.random.seed(42)
tf.random.set_seed(42)

# Build model v2
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape=[28, 28, 1]),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(50, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])

model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=keras.optimizers.SGD(lr=1e-2),
    metrics=["accuracy"],
)

# Train the model
history = model.fit(X_train, y_train, epochs=10, validation_data=(X_valid, y_valid))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [28]:
# Make new model version and path
model_version = "0002"
model_path = os.path.join("data", model_name, model_version)

# Save the model as a SavedModel
tf.saved_model.save(model, model_path)

INFO:tensorflow:Assets written to: data/my_mnist_model/0002/assets


In [29]:
!tree data/{model_name}

[01;34mdata/my_mnist_model[00m
├── [01;34m0001[00m
│   ├── [01;34massets[00m
│   ├── saved_model.pb
│   └── [01;34mvariables[00m
│       ├── variables.data-00000-of-00001
│       └── variables.index
└── [01;34m0002[00m
    ├── [01;34massets[00m
    ├── saved_model.pb
    └── [01;34mvariables[00m
        ├── variables.data-00000-of-00001
        └── variables.index

6 directories, 6 files


TF Serving will automatically detect new model version and gracefully deploy it. As in a *blue-green deployment*, it will process all pending requests with the old model while collecting new ones which will be then processed by the new model. This can be reconfigured to immediately swap active model and process all unprocessed requests with the new version. However, in the latter configuration there is a short period when then the service is unavailable.

In [32]:
# Request predictions once again
response = requests.post(
    f"http://localhost:8501/v1/models/{model_name}:predict",
    data=input_data_json,
)
response.raise_for_status()

# Parse the response
response = response.json()

# Extract and show predictions
y_proba = np.array(response["predictions"])
y_proba.round(2)

array([[0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  , 0.  ],
       [0.  , 0.  , 0.99, 0.01, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ],
       [0.  , 0.99, 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ]])