# Using TF Serving with AI Platform Prediction Custom Containers (Beta_

This notebook demonstrates how to deploy a TensorFlow 2.x model using AI Platform Prediction Custom Containers (Beta) and TensorFlow Serving.


For the sake of the demonstration, this notebook uses the custom serving module developed in the `01-prepare-for-serving.ipynb` notebook.



In [1]:
import base64
import os
import json
import time
import numpy as np
import tensorflow as tf

import google.auth

from google.auth.credentials import Credentials
from google.auth.transport.requests import AuthorizedSession

from typing import List, Optional, Text, Tuple

## Setting up the environment

This notebook was tested on **AI Platform Notebooks** using the standard TF 2.2 image.

### Set the model store path

Set the `SAVED_MODEL_PATH` to the GCS location of the `SavedModel` created in the `01-prepare-for-serving.ipynb`

In [93]:
SAVED_MODEL_PATH = 'gs://mlops-dev-workspace/models/resnet_serving'

### Push the TF Serving container to the local GCR

In [94]:
_ , project_id = google.auth.default()

cpu_image_name = 'gcr.io/{}/tensorflow_serving:latest-cpu'.format(project_id)
gpu_image_name = 'gcr.io/{}/tensorflow_serving:latest-gpu'.format(project_id)

In [None]:
!docker pull tensorflow/serving:latest
!docker pull tensorflow/serving:latest-gpu

In [None]:
!docker tag tensorflow/serving:latest {cpu_image_name}
!docker tag tensorflow/serving:latest-gpu {gpu_image_name}

In [None]:
!docker push {cpu_image_name}
!docker push {gpu_image_name}

## Deploying the model

Define a helper class that wraps AI Platform Prediction Alpha REST API.

In [95]:
class AIPPClient(object):
    """
    A utility class that wraps AI Platform Prediction Alpha REST API.
    """
    
    def __init__(self, service_endpoint):
        self._service_endpoint = service_endpoint
        credentials, _ = google.auth.default()
        self._authed_session = AuthorizedSession(credentials)

    def list_models(self, project_id: str) -> dict:
        """
        Lists model resources in a project.
        """
    
        url = '{}/v1/projects/{}/models/'.format(self._service_endpoint, project_id)

        response = self._authed_session.get(url)
    
        return response.json()

    def get_model(self, project_id: str, model_name: str) -> dict:
        """
        Retrieves model metadata.
        """
    
        url = '{}/v1/projects/{}/models/{}'.format(self._service_endpoint, project_id, model_name)

        response = self._authed_session.get(url)
    
        return response.json()
    
    
    def create_model(self, project_id: str, model_name: str) -> dict:
        """
        Creates a model resource.
        """
    
        url = '{}/v1/projects/{}/models/'.format(self._service_endpoint, project_id)

        request_body = {
            "name": model_name
        }

    
        response = self._authed_session.post(url, data=json.dumps(request_body))
    
        return response.json()


    def delete_model(self, project_id: str, model_name: str) -> dict:
        """
        Deletes a model resource.
        """
    
        url = '{}/v1/projects/{}/models/{}'.format(self._service_endpoint, project_id, model_name)

    
        response = self._authed_session.delete(url)

        return response.json()


    def create_model_version(
        self,
        project_id: str, 
        model_name: str, 
        version_name: str,
        model_gcs_path: str,
        machine_type: str,
        serving_image: str,
        gpu_count: int=0,
        gpu_type: str=None,
        enable_batching=False,)-> dict:
        """
        Creates a model version resource.
        """
    
        url = '{}/v1/projects/{}/models/{}/versions'.format(self._service_endpoint, project_id, model_name)
        
        args = ["--rest_api_port=8080",
                "--model_name={}".format(model_name),
                "--model_base_path=$(AIP_STORAGE_URI)"]
        
        if enable_batching:
            args.append([
                "--enable_batching",
                "--batching_parameters_file=$(AIP_STORAGE_URI)/batching.pbtxt"
            ])
    
        request_body = {
            "name": version_name,
            "deployment_uri": model_gcs_path,
            "machine_type": machine_type,
            "container": {
                "image": serving_image,
                "args": args
            },
            "routes": {
                "predict": "/v1/models/{}:predict".format(model_name),
                "health": "/v1/models/{}".format(model_name)
            }
        }
        
        if gpu_count > 0:
            accelerator_config = {
                "count": gpu_count,
                "type": gpu_type
            }
            request_body["accelerator_config"] = accelerator_config
            
    
        response = self._authed_session.post(url, data=json.dumps(request_body))

        return response.json()


    def get_model_version(self, project_id: str, model_name: str, version_name: str)-> dict:
        """
        Creates a model version.
        """
    
        url = '{}/v1/projects/{}/models/{}/versions/{}'.format(self._service_endpoint, project_id, model_name, version_name)

        response = self._authed_session.get(url)

        return response.json()


    def delete_model_version(self, project_id: str, model_name: str, version_name: str)-> dict:
        """
        Creates a model version.
        """
    
        url = '{}/v1/projects/{}/models/{}/versions/{}'.format(self._service_endpoint, project_id, model_name, version_name)

        response = self._authed_session.delete(url)

        return response.json()


    def list_model_versions(self, project_id: str, model_name: str)-> dict:
        """
        Lists model versions.
        """
    
        url = '{}/v1/projects/{}/models/{}/versions'.format(self._service_endpoint, project_id, model_name)

        response = self._authed_session.get(url)

        return response.json()


    def call_predict(
        self,
        project_id:str, 
        model_name: str, 
        version_name: str, 
        signature: str,
        instances: list) -> dict:
        """
        Invokes the predict method on the specified signature.
        """
    
        url = '{}/v1/projects/{}/models/{}/versions/{}:predict'.format(self._service_endpoint, project_id, model_name, version_name)    
    
        request_body = {
            'signature_name': signature,
            'instances': encoded_images
        }
    
        response = self._authed_session.post(url, data=json.dumps(request_body))

        return response.json()


### List all models in the project

In [96]:
service_endpoint = 'https://alpha-ml.googleapis.com'

client = AIPPClient(service_endpoint)

In [97]:
client.list_models(project_id)

{'models': [{'name': 'projects/mlops-dev-env/models/ResNet101',
   'regions': ['us-central1'],
   'etag': 'a9Zb+va/g8Y='}]}

### Create a model resource

In [98]:
model_name = 'ResNet101'

client.create_model(project_id, model_name)

{'error': {'code': 409,
  'message': 'Field: model.name Error: A model with the same name already exists.',
  'status': 'ALREADY_EXISTS',
  'details': [{'@type': 'type.googleapis.com/google.rpc.BadRequest',
    'fieldViolations': [{'field': 'model.name',
      'description': 'A model with the same name already exists.'}]}]}}

### Get the model's info

In [8]:
client.get_model(project_id, model_name)

{'name': 'projects/mlops-dev-env/models/ResNet101',
 'regions': ['us-central1'],
 'etag': 'a9Zb+va/g8Y='}

### Create a model version 

In [150]:
client.list_model_versions(project_id, model_name)

{}

#### Create the batch config file

In [151]:
batching_config = 'batching.pbtxt'

In [152]:
%%writefile {batching_config}

max_batch_size { value: 128 }
batch_timeout_micros { value: 150000 }
max_enqueued_batches { value: 16 }
num_batch_threads { value: 8 }


Overwriting batching.pbtxt


#### Copy the batch config file to the model's folder

In [153]:
!gsutil cp {batching_config} {SAVED_MODEL_PATH}/{batching_config}

Copying file://batching.pbtxt [Content-Type=application/octet-stream]...
/ [1 files][  136.0 B/  136.0 B]                                                
Operation completed over 1 objects/136.0 B.                                      


In [154]:
!gsutil cat {SAVED_MODEL_PATH}/batching.pbtxt


max_batch_size { value: 128 }
batch_timeout_micros { value: 150000 }
max_enqueued_batches { value: 16 }
num_batch_threads { value: 8 }


### Provision the model version

In [155]:
version_name = 'batching_150'
image_name = gpu_image_name
machine_type = 'n1-standard-8'
gpu_count = 1
gpu_type = 'NVIDIA_TESLA_P4'
enable_batching = True

client.create_model_version(
    project_id=project_id, 
    model_name=model_name,
    version_name=version_name,
    model_gcs_path=SAVED_MODEL_PATH,
    machine_type=machine_type,
    serving_image=image_name,
    gpu_count=gpu_count,
    gpu_type=gpu_type,
    enable_batching=enable_batching)

{'name': 'projects/mlops-dev-env/operations/create_ResNet101_batching_150-1597353807248',
 'metadata': {'@type': 'type.googleapis.com/google.cloud.ml.v1.OperationMetadata',
  'createTime': '2020-08-13T21:23:28Z',
  'operationType': 'CREATE_VERSION',
  'modelName': 'projects/mlops-dev-env/models/ResNet101',
  'version': {'name': 'projects/mlops-dev-env/models/ResNet101/versions/batching_150',
   'deploymentUri': 'gs://mlops-dev-workspace/models/resnet_serving',
   'createTime': '2020-08-13T21:23:27Z',
   'etag': 'n65JqJRjAcI=',
   'machineType': 'n1-standard-8',
   'acceleratorConfig': {'count': '1', 'type': 'NVIDIA_TESLA_P4'},
   'container': {'image': 'gcr.io/mlops-dev-env/tensorflow_serving:latest-gpu',
    'args': ['--rest_api_port=8080',
     '--model_name=ResNet101',
     '--model_base_path=$(AIP_STORAGE_URI)',
     '--enable_batching',
     '--batching_parameters_file=$(AIP_STORAGE_URI)/batching.pbtxt']},
   'routes': {'predict': '/v1/models/ResNet101:predict',
    'health': '/v1

### Check the deployment status

In [159]:
client.get_model_version(project_id, model_name, version_name)

{'name': 'projects/mlops-dev-env/models/ResNet101/versions/batching_150',
 'isDefault': True,
 'deploymentUri': 'gs://mlops-dev-workspace/models/resnet_serving',
 'createTime': '2020-08-13T21:23:27Z',
 'state': 'READY',
 'etag': 'HT7qYps2w28=',
 'machineType': 'n1-standard-8',
 'acceleratorConfig': {'count': '1', 'type': 'NVIDIA_TESLA_P4'},
 'container': {'image': 'gcr.io/mlops-dev-env/tensorflow_serving:latest-gpu',
  'args': ['--rest_api_port=8080',
   '--model_name=ResNet101',
   '--model_base_path=$(AIP_STORAGE_URI)',
   '--enable_batching',
   '--batching_parameters_file=$(AIP_STORAGE_URI)/batching.pbtxt']},
 'routes': {'predict': '/v1/models/ResNet101:predict',
  'health': '/v1/models/ResNet101'}}

## Testing the model's versions

We will now run inference by invoking the TF Serving `Predict` API.

Refer to the [TF Serving REST API Reference](https://www.tensorflow.org/tfx/serving/api_rest) for more information about the API format.

#### Load sample images

In [19]:
image_folder = 'locust/locust-image/test_images'
raw_images = [tf.io.read_file(os.path.join(image_folder, image_path)).numpy()
         for image_path in os.listdir(image_folder)]

encoded_images = [{'b64': base64.b64encode(image).decode('utf-8')} for image in raw_images]  

#### Call the `predict` endpoint 

In [20]:
signature = 'serving_preprocess'

client.call_predict(
    project_id=project_id, 
    model_name=model_name, 
    version_name=version_name, 
    signature=signature,
    instances=encoded_images)

{'predictions': [{'labels': ['military uniform',
    'suit',
    'Windsor tie',
    'pickelhaube',
    'bow tie'],
   'probabilities': [0.940013826,
    0.0485324822,
    0.00640657172,
    0.00201301626,
    0.000604337547]},
  {'labels': ['Egyptian cat', 'tiger cat', 'tabby', 'lynx', 'Siamese cat'],
   'probabilities': [0.827052057,
    0.131283119,
    0.0410555713,
    0.0005708182,
    1.89249167e-05]}]}

## Cleaning up

### Delete model version and model
#### List model versions

In [146]:
model_name = 'ResNet101'

client.list_model_versions(project_id, model_name)

{'versions': [{'name': 'projects/mlops-dev-env/models/ResNet101/versions/batching_100',
   'isDefault': True,
   'deploymentUri': 'gs://mlops-dev-workspace/models/resnet_serving',
   'createTime': '2020-08-13T19:00:51Z',
   'lastUseTime': '2020-08-13T21:22:27Z',
   'state': 'READY',
   'etag': 'MbwMKzLPDeE=',
   'machineType': 'n1-standard-8',
   'acceleratorConfig': {'count': '1', 'type': 'NVIDIA_TESLA_P4'},
   'container': {'image': 'gcr.io/mlops-dev-env/tensorflow_serving:latest-gpu',
    'args': ['--rest_api_port=8080',
     '--model_name=ResNet101',
     '--model_base_path=$(AIP_STORAGE_URI)',
     '--enable_batching',
     '--batching_parameters_file=$(AIP_STORAGE_URI)/batching.pbtxt']},
   'routes': {'predict': '/v1/models/ResNet101:predict',
    'health': '/v1/models/ResNet101'}}]}

#### Delete the specific version

In [147]:
model_version = 'batching_100'

client.delete_model_version(project_id, model_name, model_version)

{'name': 'projects/mlops-dev-env/operations/delete_ResNet101_batching_100-1597353770668',
 'metadata': {'@type': 'type.googleapis.com/google.cloud.ml.v1.OperationMetadata',
  'createTime': '2020-08-13T21:22:50Z',
  'operationType': 'DELETE_VERSION',
  'modelName': 'projects/mlops-dev-env/models/ResNet101',
  'version': {'name': 'projects/mlops-dev-env/models/ResNet101/versions/batching_100',
   'deploymentUri': 'gs://mlops-dev-workspace/models/resnet_serving',
   'createTime': '2020-08-13T19:00:51Z',
   'state': 'READY',
   'etag': 'MbwMKzLPDeE=',
   'machineType': 'n1-standard-8',
   'acceleratorConfig': {'count': '1', 'type': 'NVIDIA_TESLA_P4'},
   'container': {'image': 'gcr.io/mlops-dev-env/tensorflow_serving:latest-gpu',
    'args': ['--rest_api_port=8080',
     '--model_name=ResNet101',
     '--model_base_path=$(AIP_STORAGE_URI)',
     '--enable_batching',
     '--batching_parameters_file=$(AIP_STORAGE_URI)/batching.pbtxt']},
   'routes': {'predict': '/v1/models/ResNet101:predict

#### Delete the model

In [None]:
client.delete_model(project_id, model_name)

## Next Steps

Walk through the `aipp_deploy.ipynb` notebook to learn how to deploy the custom serving module created in this notebook to **AI Platform Prediction** using TF Serving container image.

## License

<font size=-1>Licensed under the Apache License, Version 2.0 (the \"License\");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at [https://www.apache.org/licenses/LICENSE-2.0](https://www.apache.org/licenses/LICENSE-2.0)

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.</font>