In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Training an image classification model with a `CustomJob` and getting online predictions

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/ai-platform-samples/master/ai-platform-unified/notebooks/custom_job_image_classification_model_for_online_prediction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/ai-platform-samples/master/ai-platform-unified/notebooks/custom_job_image_classification_model_for_online_prediction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

# Overview


This tutorial demonstrates how to use the AI Platform (Unified) Python client library to train and deploy a custom image classification model for online prediction.

### Dataset

The dataset used for this tutorial is the [cifar10 dataset](https://www.tensorflow.org/datasets/catalog/cifar10) from [TensorFlow Datasets](https://www.tensorflow.org/datasets/catalog/overview). The version of the dataset you will use is built into TensorFlow. The trained model predicts which type of class an image is from ten classes: airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck.

### Objective

In this notebook, you will learn how to create a custom model from a Python script in a Docker container using the AI Platform Python client library, and then do a prediction on the deployed model. You can alternatively create custom models from the command line using the `gcloud` command-line tool or online using the Google Cloud Console.

The steps performed include: 

- Create an AI Platform `CustomJob` resource for training a model.
- Train the model.
- Retrieve and load the model (artifacts).
- View the model evaluation.
- Upload the model as a AI Platform `Model` resource.
- Deploy the model to a serving `Endpoint` resource.
- Make a prediction.
- Undeploy the `Model`.

### Costs 

This tutorial uses billable components of Google Cloud:

* AI Platform (Unified)
* Cloud Storage

Learn about [AI Platform (Unified)
pricing](https://cloud.google.com/ai-platform-unified/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing).

## Installation

Install the latest (preview) version of AI Platform Python client library.

In [None]:
! pip3 install -U google-cloud-aiplatform

Install the Google `cloud-storage` library as well.

In [None]:
! pip3 install google-cloud-storage

Install the *pillow* library for creating test images.

In [None]:
! pip3 install -U pillow

Install tensorflow to test the model after training

In [None]:
! pip3 install tensorflow

### Restart the Kernel

Once you've installed the AI Platform Python client library, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin

### GPU runtime

**Make sure you're running this notebook in a GPU runtime if you have that option. In Colab, select Runtime > Change runtime type > GPU**

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project.](https://cloud.google.com/billing/docs/how-to/modify-project)

3. [Enable the AI Platform API, Compute Engine API and Container Registry API.](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component,containerregistry.googleapis.com)

4. The [Cloud SDK](https://cloud.google.com/sdk) is already installed in AI Platform Notebooks.

5. Enter your project ID in the cell below. Then run the  cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$` into these commands.

#### Project ID

**If you don't know your project ID**, you might be able to get your project ID using `gcloud` command by executing the second cell below.

In [None]:
PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

In [None]:
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID:", PROJECT_ID)

In [None]:
! gcloud config set project $PROJECT_ID

#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for AI Platform. We recommend that you choose the region closest to you. 

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You must not use a multi-regional bucket for training with AI Platform. Not all regions provide support for all AI Platform services. For the latest support per region, see [the AI Platform locations documenation](https://cloud.google.com/ai-platform-unified/docs/general/locations)

In [None]:
REGION = "us-central1"  # @param {type: "string"}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append onto the name of resources which will be created in this tutorial.

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Authenticate your Google Cloud account

**If you are using AI Platform Notebooks**, your environment is already
authenticated. Skip this step.

In [None]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your Google Cloud account. This provides access
# to your Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# If on AI Platform Notebooks, then don't execute this code
if not os.path.exists("/opt/deeplearning/metadata/env_version"):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this tutorial in a notebook locally, replace the string
    # below with the path to your service account key and run this cell to
    # authenticate your Google Cloud account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS your_path_to_credentials.json

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

In this tutorial, you upload a Python package
containing your training code to a Cloud Storage bucket. AI Platform runs
the code from this package. In this tutorial, AI Platform also saves the
trained model that results from your job in the same bucket. You can then
create an AI Platform `Endpoint` based on this output in order to serve
online predictions.

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets. 

In [None]:
BUCKET_NAME = "[your-bucket-name]"  # @param {type:"string"}

In [None]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "ucaip-custom-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION gs://$BUCKET_NAME

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al gs://$BUCKET_NAME

### Import libraries and define constants

#### Import AI Platform Python client library

Import the AI Platform Python client library into your Python environment.

In [None]:
import os
import sys
import time

from google.cloud.aiplatform import gapic as aip
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value

#### AI Platform constants

Set some constants for AI Platform:

- `API_ENDPOINT`: The AI Platform API service endpoint for the Job, Model, Endpoint, and Prediction services.
- `PARENT`: The AI Platform location root path for dataset, model and endpoint resources.

In [None]:
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

# AI Platform (Unified) location root path for your dataset, model and endpoint resources
PARENT = "projects/" + PROJECT_ID + "/locations/" + REGION

#### Hardware Accelerators

Set the hardware accelerators (e.g., GPU), if any, for training and prediction.

Set the variables `TRAIN_GPU/TRAIN_NGPU` and `DEPLOY_GPU/DEPLOY_NGPU` to use a container image supporting a GPU and the number of GPUs allocated the virtual machine (VM) instance. For example, to use a GPU container image with 4 Nvidia Telsa K80 GPUs allocated to each VM, specify:

    (aip.AcceleratorType.NVIDIA_TESLA_K80, 4)

For GPU, available accelerators include:
   - aip.AcceleratorType.NVIDIA_TESLA_K80
   - aip.AcceleratorType.NVIDIA_TESLA_P100
   - aip.AcceleratorType.NVIDIA_TESLA_P4
   - aip.AcceleratorType.NVIDIA_TESLA_T4
   - aip.AcceleratorType.NVIDIA_TESLA_V100

   
Otherwise specify `(None, None)` to use a container image to run on a CPU.
   
*Note*: TF releases before 2.3 for GPU support will fail to load the custom model in this tutorial. It is a known issue and fixed in TF 2.3 -- which is caused by static graph ops that are generated in the serving function. If you encounter this issue on your own custom models, the workaround is to create your own Docker container image for TF 2.3 with GPU support.

In [None]:
TRAIN_GPU, TRAIN_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_K80, 1)
DEPLOY_GPU, DEPLOY_NGPU = (None, None)

#### Container (Docker) image

Next, set the Docker container images for training and prediction.

- Set the variable `TF` to the Tensorflow version of the container image. For example, `2-1` would be version 2.1, and `1-15` would be version 1.15. The following list shows some of the pre-built images available:

 - Tensorflow 1.15
   - `gcr.io/cloud-aiplatform/training/tf-cpu.1-15:latest`
   - `gcr.io/cloud-aiplatform/training/tf-gpu.1-15:latest`
   - `gcr.io/cloud-aiplatform/prediction/tf-cpu.1-15:latest`
   - `gcr.io/cloud-aiplatform/prediction/tf-gpu.1-15:latest`
 - Tensorflow 2.1
   - `gcr.io/cloud-aiplatform/training/tf-cpu.2-1:latest`
   - `gcr.io/cloud-aiplatform/training/tf-gpu.2-1:latest`
   - `gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-1:latest`
   - `gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-1:latest`
 - Tensorflow 2.2
   - `gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest`
   - `gcr.io/cloud-aiplatform/training/tf-gpu.2-2:latest`
   - `gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest`
   - `gcr.io/cloud-aiplatform/prediction/tf2-gpu.2-2:latest`
 - XGBoost
   - `gcr.io/cloud-aiplatform/training/xgboost-cpu.1-1`
   - `gcr.io/cloud-aiplatform/prediction/xgboost-cpu.1-1`
 - Scikit-learn
   - `gcr.io/cloud-aiplatform/training/scikit-learn-cpu.0-23`
   - `gcr.io/cloud-aiplatform/prediction/scikit-learn-cpu.0-23`
 - Pytorch
   - `gcr.io/cloud-aiplatform/training/pytorch-cpu.1-4:latest`
   - `gcr.io/cloud-aiplatform/training/pytorch-gpu.1-4:latest`
   
AI Platform frequently adds new training and prediction container images. For the latest list, see [Pre-built containers for training](https://cloud.google.com/ai-platform-unified/docs/training/pre-built-containers) and [Pre-built containers for prediction](https://cloud.google.com/ai-platform-unified/docs/predictions/pre-built-containers)

In [None]:
TF = "2-1"
if TF[0] == "2":
    if TRAIN_GPU:
        TRAIN_VERSION = "tf-gpu.{}".format(TF)
    else:
        TRAIN_VERSION = "tf-cpu.{}".format(TF)
    if DEPLOY_GPU:
        DEPLOY_VERSION = "tf2-gpu.{}".format(TF)
    else:
        DEPLOY_VERSION = "tf2-cpu.{}".format(TF)
else:
    if TRAIN_GPU:
        TRAIN_VERSION = "tf-gpu.{}".format(TF)
    else:
        TRAIN_VERSION = "tf-cpu.{}".format(TF)
    if DEPLOY_GPU:
        DEPLOY_VERSION = "tf-gpu.{}".format(TF)
    else:
        DEPLOY_VERSION = "tf-cpu.{}".format(TF)

TRAIN_IMAGE = "gcr.io/cloud-aiplatform/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "gcr.io/cloud-aiplatform/prediction/{}:latest".format(DEPLOY_VERSION)

print("Training:", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)
print("Deployment:", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)

#### Machine Type

Next, set the machine type to use for training and prediction.

- Set the variables `TRAIN_COMPUTE` and `DEPLOY_COMPUTE` to configure the compute resources for the VMs you will use for training and prediction.
 - `machine type`
     - `n1-standard`: 3.75GB of memory per vCPU.
     - `n1-highmem`: 6.5GB of memory per vCPU
     - `n1-highcpu`: 0.9 GB of memory per vCPU
 - `vCPUs`: number of \[2, 4, 8, 16, 32, 64, 96 \]
  
*Note: The following is not supported for training*
 
 - `standard`: 2 vCPUs
 - `highcpu`: 2, 4 and 8 vCPUs
 
*Note: You may also use n2 and e2 machine types for training and deployment, but they do not support GPUs*

In [None]:
MACHINE_TYPE = "n1-standard"
VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"
VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

# Tutorial

Now you are ready to start creating your own custom model and training for CIFAR10.

## Clients

The AI Platform Python client library works as a client/server model. On your side, the Python script, you will create a client that sends requests and receives responses from the server -- AI Platform.

Use several clients in this tutorial, so you will set them all up upfront.

- Job Service for custom jobs.
- Model Service for managed models.
- Endpoint Service for deployment.
- Prediction Service for serving.

In [None]:
# client options same for all services
client_options = {"api_endpoint": API_ENDPOINT}
predict_client_options = {"api_endpoint": API_ENDPOINT}


def create_job_client():
    client = aip.JobServiceClient(client_options=client_options)
    return client


def create_model_client():
    client = aip.ModelServiceClient(client_options=client_options)
    return client


def create_endpoint_client():
    client = aip.EndpointServiceClient(client_options=client_options)
    return client


def create_prediction_client():
    client = aip.PredictionServiceClient(client_options=predict_client_options)
    return client


clients = {}
clients["job"] = create_job_client()
clients["model"] = create_model_client()
clients["endpoint"] = create_endpoint_client()
clients["prediction"] = create_prediction_client()

for client in clients.items():
    print(client)

## Prepare your `CustomJob` specification

Now that your clients are ready, your first step is to create a `CustomJob` specification for your custom training job.

To practice using the Job service, start by training an **empty job**. In other words, create a `CustomJob` specification that provisions resources for training a job, and initiate the job using the client library's Job service, but configure the `CustomJob` so it doesn't actually train an ML model.

This lets you focus on understanding the basic steps. Afterwards, you can create another `CustomJob` with a focus on adding the Python training package for training a CIFAR10 custom model.

### Define a container specification

Let's first start by defining a job name and then a container specification:

- `JOB_NAME`: A unique name for your custom training job. For convenience, append a timestamp to make the name unique.
- `MODEL_DIR`: A location in your Cloud Storage bucket for storing the model artificats.
- `image_uri`: The location of the container image in Artifact Registry, Container Registry, or Docker Hub. This can be either a Google Cloud pre-built image or your own container image.
- `--model-dir`: A command-line parameter to the container indicating the location to store the model.

In [None]:
JOB_NAME = "custom_job_" + TIMESTAMP
MODEL_DIR = "gs://{}/{}".format(BUCKET_NAME, JOB_NAME)
CONTAINER_SPEC = {
    "image_uri": TRAIN_IMAGE,
    "args": ["--model-dir=" + MODEL_DIR],
}

### Define the worker pool specification


Next, you define the worker pool specification for your custom training job. This tells AI Platform what type and how many VMs to provision for the training.

For this tutorial, you will use a single instance (node). 

- `replica_count`: The number of VMs to provision of this machine type.
- `machine_type`: The type of VM to provision -- e.g., n1-standard-8.
- `accelerator_type`: The type, if any, of hardware accelerator. In this tutorial if you previously set the variable `TRAIN_GPU != None`, you are using a GPU; otherwise you will use a CPU. 
- `accelerator_count`: The number of accelerators.
- `container_spec`: The Docker container to install on the instances.

In [None]:
if TRAIN_GPU:
    machine_spec = {
        "machine_type": TRAIN_COMPUTE,
        "accelerator_type": TRAIN_GPU,
        "accelerator_count": TRAIN_NGPU,
    }
else:
    machine_spec = {"machine_type": TRAIN_COMPUTE, "accelerator_count": 0}

WORKER_POOL_SPEC = [
    {
        "replica_count": 1,
        "machine_spec": machine_spec,
        "container_spec": CONTAINER_SPEC,
    }
]

If you were doing distributed training, you would add a second machine description and set the replica count accordingly. In the example below, the first machine descrption is the primary (coordinator), and the second ones are the machines the training is distributed to.

```
WORKER_POOL_SPEC=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8"
        },
        "container_spec":  CONTAINER_SPEC,
      },
      {
        "replica_count": 6,
        "machine_spec": {
          "machine_type": "n1-standard-8"
        },
        "container_spec": CONTAINER_SPEC
      }
]
```

### Assemble the job specification

Assemble the description for the `CustomJob` specification.

- `display_name`: The human readable name you assign to this `CustomJob`.
- `job_spec`: The specification for the `CustomJob`. Since this is an empty job, you only specified the resource requirements.

In [None]:
CUSTOM_JOB = {
    "display_name": JOB_NAME,
    "job_spec": {"worker_pool_specs": WORKER_POOL_SPEC},
}

## Train the model

Start the training of your custom training job on AI Platform. Use this helper function `create_custom_job`, which takes the following parameter:

- `custom_job`: The specification for the `CustomJob`.

The helper function calls the job client service's `create_custom_job` method, with the following parameters:

- `parent`: The AI Platform location path to `Dataset`, `Model` and `Endpoint` resources.
- `custom_job`: The specification for the `CustomJob`.

You will display a handful of the fields returned in `response` object. The two that are of most interest are the following:

- `response.name`: The AI Platform fully qualified identifier assigned to this `CustomJob`. Save this identifier for using in subsequent steps.
- `response.state`: The current state of the `CustomJob`. 

In [None]:
def create_custom_job(custom_job):
    response = clients["job"].create_custom_job(parent=PARENT, custom_job=CUSTOM_JOB)
    print("name:", response.name)
    print("display_name:", response.display_name)
    print("state:", response.state)
    print("create_time:", response.create_time)
    print("update_time:", response.update_time)
    return response.name


# Save the job name
JOB_ID = create_custom_job(CUSTOM_JOB)

### List all `CustomJob` resources

Now that your `CustomJob` is running, get a list for all the `CustomJob` resources associated with your `PROJECT_ID`. This will probably be just one job, unless you've been running this tutorial multiple times or otherwise been using the AI Platform job service.

Use the helper function `list_custom_jobs`, which calls the job client service's `list_custom_jobs` method. The response object is a list, where each element in the list is a separate job.  

The `response` object for each `CustomJob` contains:

- `name`: The AI Platform fully qualified identifier for your custom training job.
- `display_name`: The human readable name you assigned to your custom training job.
- `job_spec`: The job specification you provided for your custom training job.
- `state`: The current status of the `CustomJob`.
- `start_time`: When the custom training job was created.
- `end_time`: When the execution of the custom job ended.
- `update_time`: When the last time there was a status update to the `CustomJob`.

In [None]:
def list_custom_jobs():
    response = clients["job"].list_custom_jobs(parent=PARENT)
    for job in response:
        print(response)


list_custom_jobs()

### Get information on a custom job

Next, use this helper function `get_custom_job`, which takes the following parameter:

- `name`: The AI Platform fully qualified identifier for the `CustomJob`.

The helper function gets the job information for just this job by calling the the job client service's `get_custom_job` method, with the following parameter:

- `name`: The AI Platform fully qualified identifier for the `CustomJob`.

Recall that you got the AI Platform fully qualified identifier for the custom job in the `response.name` field when you called the `create_custom_job` method, and you saved the identifier in the variable `JOB_ID`.

In [None]:
def get_custom_job(name, silent=False):
    response = clients["job"].get_custom_job(name=name)
    if silent:
        return response

    print("name:", response.name)
    print("display_name:", response.display_name)
    print("state:", response.state)
    print("create_time:", response.create_time)
    print("update_time:", response.update_time)
    return response


get_custom_job(JOB_ID)

### View logs

In addition to the aforementioned state information on the job information, you can also look at logs associated with the `CustomJob`.

#### View logs in the AI Platform section of the Cloud Console

You can view logs in the AI Platform section of the Cloud Console. The cell below will display a link. Paste the link in the address bar of another tab in your browser. It will display information about your job.

In [None]:
print(
    "https://console.cloud.google.com/ai/platform/locations/{region}/training/{job_id}?project={project_id}".format(
        region=REGION, job_id=JOB_ID.split("/")[-1], project_id=PROJECT_ID
    )
)

#### View logs in Cloud Logging

You can also view the logs in Cloud Logging. The cell below will display a link. Paste the link in the address bar of another tab in your browser. It will display logs for your job.

In [None]:
print(
    "https://console.cloud.google.com/logs/viewer?resource=ml_job%2Fjob_id%2F{job_id}&project={project_id}".format(
        job_id=JOB_ID.split("/")[-1], project_id=PROJECT_ID
    )
)

### Cancel a `CustomJob`

If you want, you can cancel your "empty" training job. Use this helper function `cancel_job`, with the following parameter:

- `name`: The AI Platform fully qualified identifier for your custom training job.

The helper function will call the job service client's `cancel_custom_job` method, with the following parameter:

- `name`: The AI Platform fully qualified identifier for your custom training job.

Use a try/except around the call since it will throw an exception if the job is already completed (succeeded) -- which most likely it is.

In [None]:
def cancel_job(name):
    try:
        response = clients["job"].cancel_custom_job(name=name)
        print(response)
    except Exception as e:
        print(e)


time.sleep(10)
cancel_job(JOB_ID)

### Delete a custom job

Next, you can delete your "empty" training job. Use the helper function `delete_job`, with the following parameter:

- `name`: The AI Platform fully qualified identifier for your custom training job.

The helper function will call the job service client's `delete_custom_job` method, with the parameter:

- `name`: The AI Platform fully qualified identifier for your custom training job.

Afterwards, you can verify that the job has been deleted by calling the method `get_custom_job` for the same job. Use a try/except around the call since it will throw an exception if the job is already deleted -- which most likely it is.

In [None]:
def delete_job(name):
    try:
        response = clients["job"].delete_custom_job(name=name)
        print("Delete", response)
    except Exception as e:
        print(e)

    try:
        response = clients["job"].get_custom_job(name=name)
    except Exception as e:
        print(e)


time.sleep(10)
delete_job(JOB_ID)

## Train a model - CIFAR10

Now that you have seen the basic steps for custom training, you will do a new custom job to train a model. There are two ways you can train a custom model using a container image:

- **Use a an AI Platform pre-built container**. If you use a pre-built container, you will additionally specify a Python package to install into the container image. This Python package contains your code for training a custom model.

- **Use your own custom container image**. If you use your own container, the container needs to contain your code for training a custom model.


In this tutorial, you will train a CIFAR10 model using a pre-built container. You need to update the worker pool specification by adding a description for `python_package_spec`. This section will tell the custom job the Python training package to install and which Python module to invoke, along with command line arguments for the Python module.

The Python package specification contains the following fields:

-`executor_image_spec`: This is the Docker image which is configured for your custom training job. You will continue to use the same one we used earlier for demonstration.

-`package_uris`: This is a list of the locations (URIs) of your Python training packages to install on the provisioned instance. The locations need to be in a Cloud Storage bucket. These can be either individual Python files or a zip (archive) of an entire package. In the later case, the job service will unzip (unarchive) the contents into the Docker container.

-`python_module`: The Python module (script) to invoke for running the custom training job. In this example, you will be invoking `trainer.task.py` -- note that it was not neccessary to append the `.py` suffix.

-`args`: The command line arguments to pass to the corresponding pythom module. In this example, you will be:
  - `"--model-dir=" + MODEL_DIR` : The Cloud Storage location where to store the model artifacts. There are two ways to tell the training script where to save the model artifacts:
      - direct: You pass the Cloud Storage location as a command line argument to your training script (set variable `DIRECT = True`), or
      - indirect: The service passes the Cloud Storage location as the environment variable `AIP_MODEL_DIR` to your training script (set variable `DIRECT = False`). In this case, you tell the service the model artifact location in the job specification.
  - `"--epochs=" + EPOCHS`: The number of epochs for training.
  - `"--steps=" + STEPS`: The number of steps (batches) per epoch.
  - `"--distribute=" + TRAIN_STRATEGY"` : The training distribution strategy to use for single or distributed training.
     - `"single"`: single device.
     - `"mirror"`: all GPU devices on a single VM.
     - `"multi"`: all GPU devices on all VMs.

In [None]:
if TRAIN_GPU:
    machine_spec = {
        "machine_type": TRAIN_COMPUTE,
        "accelerator_type": TRAIN_GPU,
        "accelerator_count": TRAIN_NGPU,
    }
else:
    machine_spec = {"machine_type": TRAIN_COMPUTE, "accelerator_count": 0}

if not TRAIN_NGPU or TRAIN_NGPU < 2:
    TRAIN_STRATEGY = "single"
else:
    TRAIN_STRATEGY = "mirror"

EPOCHS = 20
STEPS = 100

DIRECT = True
if DIRECT:
    CMDARGS = [
        "--model-dir=" + MODEL_DIR,
        "--epochs=" + str(EPOCHS),
        "--steps=" + str(STEPS),
        "--distribute=" + TRAIN_STRATEGY,
    ]
else:
    CMDARGS = [
        "--epochs=" + str(EPOCHS),
        "--steps=" + str(STEPS),
        "--distribute=" + TRAIN_STRATEGY,
    ]

WORKER_POOL_SPEC = [
    {
        "replica_count": 1,
        "machine_spec": machine_spec,
        "python_package_spec": {
            "executor_image_uri": TRAIN_IMAGE,
            "package_uris": ["gs://" + BUCKET_NAME + "/trainer_cifar.tar.gz"],
            "python_module": "trainer.task",
            "args": CMDARGS,
        },
    }
]

### Assemble a job specification

Now assemble the description for the `CustomJob` specification:

- `display_name`: The human-readable name you assign to this `CustomJob`.
- `job_spec`: The specification for the `CustomJob`. 
    - `base_output_directory`: This tells the service the Cloud Storage location where to save the model artifacts (when variable `DIRECT = False`). The service will then pass the location to the training script as the environment variable `AIP_MODEL_DIR`, and the path will be of the form:
    
                <output_uri_prefix>/model

In [None]:
if DIRECT:
    JOB_SPEC = {"worker_pool_specs": WORKER_POOL_SPEC}
else:
    JOB_SPEC = {
        "worker_pool_specs": WORKER_POOL_SPEC,
        "base_output_directory": {"output_uri_prefix": MODEL_DIR},
    }

CUSTOM_JOB = {"display_name": JOB_NAME, "job_spec": JOB_SPEC}

### Examine the training package

#### Package layout

Before you start the training, look at how a Python package is assembled for a custom training job. When unarchived, the package contains the following directory/file layout.

- PKG-INFO
- README.md
- setup.cfg
- setup.py
- trainer
  - \_\_init\_\_.py
  - task.py

The files `setup.cfg` and `setup.py` are the instructions for installing the package into the operating environment of the Docker container.

The file `trainer/task.py` is the Python script for executing the `CustomJob`. *Note*, when we referred to it in the worker pool specification, we replace the directory slash with a dot (`trainer.task`) and dropped the file suffix (`.py`).

#### Package Assembly

In the following cells, you will assemble the training package.

In [None]:
# Make folder for Python training script
! rm -rf custom
! mkdir custom

# Add package information
! touch custom/README.md

setup_cfg = "[egg_info]\n\
tag_build =\n\
tag_date = 0"
! echo "$setup_cfg" > custom/setup.cfg

setup_py = "import setuptools\n\
# Requires TensorFlow Datasets\n\
setuptools.setup(\n\
    install_requires=[\n\
        'tensorflow_datasets==1.3.0',\n\
    ],\n\
    packages=setuptools.find_packages())"
! echo "$setup_py" > custom/setup.py

pkg_info = "Metadata-Version: 1.0\n\
Name: UNKNOWN\n\
Version: 0.0.0\n\
Summary: Demostration training script\n\
Home-page: www.google.com\n\
Author: Google\n\
Author-email: UNKNOWN\n\
License: Public\n\
Description: Demo\n\
Platform: AI Platform (Unified)"
! echo "$pkg_info" > custom/PKG-INFO

# Make the training subfolder
! mkdir custom/trainer
! touch custom/trainer/__init__.py

#### Task.py contents

In the next cell, you will write the contents of the training script task.py. In summary, the training script does the following:

- Gets the directory where to save the model artifacts from the command line (`--model_dir`), and if not specified, then from the environment variable `AIP_MODEL_DIR`.
- Loads CIFAR10 dataset from TF Datasets (tfds).
- Builds a simple ConvNet model using TF.Keras model API.
- Compiles the model (`compile()`).
- Sets a training distribution strategy according to the argument `args.distribute`.
- Trains the model (`fit()`) with epochs and steps according to the arguments `args.epochs` and `args.steps`
- Saves the trained model (`save(args.model_dir)`) to the specified model directory.

In [None]:
%%writefile custom/trainer/task.py
# Single, Mirror and Multi-Machine Distributed Training for CIFAR-10

import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.python.client import device_lib
import argparse
import os
import sys
tfds.disable_progress_bar()

parser = argparse.ArgumentParser()
parser.add_argument('--model-dir', dest='model_dir',
                    default=os.getenv("AIP_MODEL_DIR"), type=str, help='Model dir.')
parser.add_argument('--lr', dest='lr',
                    default=0.01, type=float,
                    help='Learning rate.')
parser.add_argument('--epochs', dest='epochs',
                    default=10, type=int,
                    help='Number of epochs.')
parser.add_argument('--steps', dest='steps',
                    default=200, type=int,
                    help='Number of steps per epoch.')
parser.add_argument('--distribute', dest='distribute', type=str, default='single',
                    help='distributed training strategy')
args = parser.parse_args()

print('Python Version = {}'.format(sys.version))
print('TensorFlow Version = {}'.format(tf.__version__))
print('TF_CONFIG = {}'.format(os.environ.get('TF_CONFIG', 'Not found')))
print('DEVICES', device_lib.list_local_devices())

# Single Machine, single compute device
if args.distribute == 'single':
    if tf.test.is_gpu_available():
        strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
    else:
        strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
# Single Machine, multiple compute device
elif args.distribute == 'mirror':
    strategy = tf.distribute.MirroredStrategy()
# Multiple Machine, multiple compute device
elif args.distribute == 'multi':
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

# Multi-worker configuration
print('num_replicas_in_sync = {}'.format(strategy.num_replicas_in_sync))

# Preparing dataset
BUFFER_SIZE = 10000
BATCH_SIZE = 64

def make_datasets_unbatched():
  # Scaling CIFAR10 data from (0, 255] to (0., 1.]
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label

  datasets, info = tfds.load(name='cifar10',
                            with_info=True,
                            as_supervised=True)
  return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE).repeat()


# Build the Keras model
def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  model.compile(
      loss=tf.keras.losses.sparse_categorical_crossentropy,
      optimizer=tf.keras.optimizers.SGD(learning_rate=args.lr),
      metrics=['accuracy'])
  return model

# Train the model
NUM_WORKERS = strategy.num_replicas_in_sync
# Here the batch size scales up by number of workers since
# `tf.data.Dataset.batch` expects the global batch size.
GLOBAL_BATCH_SIZE = BATCH_SIZE * NUM_WORKERS
train_dataset = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE)

with strategy.scope():
  # Creation of dataset, and model building/compiling need to be within
  # `strategy.scope()`.
  model = build_and_compile_cnn_model()

model.fit(x=train_dataset, epochs=args.epochs, steps_per_epoch=args.steps)
model.save(args.model_dir)

#### Store training script on your Cloud Storage bucket

Next, package the training folder into a compressed tarball, and then store it in your Cloud Storage bucket.

In [None]:
! rm -f custom.tar custom.tar.gz
! tar cvf custom.tar custom
! gzip custom.tar
! gsutil cp custom.tar.gz gs://$BUCKET_NAME/trainer_cifar.tar.gz

### Train the model

Start the training of your custom training job on AI Platform. Use the helper function from a previous section, `create_custom_job`, which will return the AI Platform (Unified) fully qualified identifier assigned to the `CustomJob`.

In [None]:
# Save the job name
JOB_ID = create_custom_job(CUSTOM_JOB)

### Get information on a `CustomJob`

Get the status on our custom training job for CIFAR10 using the helper function defined earlier `get_custom_job`. The job most likely will still be either PENDING or RUNNING.

In [None]:
response = get_custom_job(JOB_ID)

# Deployment

Once your model is done training, you can calculate the actual time it took to train the model by subtracting `start_time` from `end_time`. For your model, you need to know the location of the SavedModel, which the Python script saved in your Cloud Storage bucket at `MODEL_DIR + '/saved_model.pb'`.


In [None]:
while True:
    response = get_custom_job(JOB_ID, True)
    if response.state != aip.JobState.JOB_STATE_SUCCEEDED:
        print("Training job has not completed:", response.state)
        model_path_to_deploy = None
        if response.state == aip.JobState.JOB_STATE_FAILED:
            break
    else:
        if not DIRECT:
            MODEL_DIR = MODEL_DIR + "/model"
        model_path_to_deploy = MODEL_DIR
        print("Training Time:", response.update_time - response.create_time)
        break
    time.sleep(60)

print("model_to_deploy:", model_path_to_deploy)

## Load the saved model

Your model is stored in a TensorFlow SavedModel format in a Cloud Storage bucket. Load it from the Cloud Storage bucket in order to evaluate the model and get a prediction.

To load the model, use the `tf.keras.models.load_model()` method, passing it the Cloud Storage path where the model is saved -- specified by `MODEL_DIR`.

In [None]:
import tensorflow as tf

model = tf.keras.models.load_model(MODEL_DIR)

## Evaluate the model

Now, find out how good the model is. 

### Load evaluation data

You will load the CIFAR10 test (holdout) data from `tf.keras.datasets`, using the method `load_data()`. This will return the dataset as a tuple of two elements. The first element is the training data and the second is the test data. Each element is also a tuple of two elements: the image data, and the corresponding labels.

You don't need the training data, so the code loads it as `(_, _)`.

Before you can run the data through evaluation, you need to preprocess it:

* x_test: Normalize (rescale) the pixel data by dividing each pixel by 255. This will replace each single byte integer pixel with a 32-bit floating point number between 0 and 1.

* y_test: The labels are already scalar (sparse). If you look back at the `compile()` step in the `trainer/task.py` script, you will find that it was compiled for sparse labels. Therefore you don't need to perform any preprocessing.

In [None]:
import numpy as np
from tensorflow.keras.datasets import cifar10

(_, _), (x_test, y_test) = cifar10.load_data()
x_test = (x_test / 255.0).astype(np.float32)

print(x_test.shape, y_test.shape)

### Evaluate the model

Evaluate how well the ConvNet model trained by the `CustomJob` performed. You probably see a result around 37%, which not so good. However, this low performance is expected becuase you only trained for 20 epochs and 100 steps per epoch: see the `task.py fit() call`.

In [None]:
model.evaluate(x_test, y_test)

## Upload the model for serving

Next, upload your TensorFlow model to AI Platform in order to create a AI Platform `Model` resource. During upload, you need to define a serving function to convert data to the format your `Model` expects. If you send encoded data to AI Platform, your serving function ensures that the data is decoded on the model server before it is passed as input to your `Model`.

### How does the serving function work

When you send a request to an online prediction server, the request is received by a HTTP server. The HTTP server
extracts the prediction request from the HTTP request content body. The extracted prediction request is forwarded to the serving function. If you use one of AI Platform's pre-built prediction containers, the request content is passed to the serving function as a `tf.string`.

The serving function consists of two parts:

- `preprocessing function`:  
  - Converts the input (`tf.string`) to the input shape and data type of the underlying model (dynamic graph).
  - Performs the same preprocessing of the data that was done during training of the underlying model; for example, normalizing input data.
- `post-processing function`:  
  - Converts the model output to the format expected by the receiving application; for example, compressing the output.
  - Packages the output for the receiving application; for example, adding headings or making a JSON object.
  
Both the preprocessing and post-processing functions are converted to static graphs which are fused to the model. The output from the underlying model is passed to the post-processing function. The post-processing function passes the converted/packaged output back to the HTTP server. The HTTP server returns the output as the HTTP response content.

One consideration you need to consider when building serving functions for TensorFlow Keras models is that they run as static graphs. This means you cannot use TensorFlow graph operations that require a dynamic graph. If you do, you will get an error when you compile the serving function indicating that you are using an EagerTensor, which is not supported.

### Serving function for image data

To pass images to the prediction service, you encode the compressed (for example, JPEG) image bytes using base64 -- which makes the content safe from modification while transmitting binary data over the network. Since this TensorFlow model expects input data as raw (uncompressed) bytes, you need to ensure that the base64-encoded data gets converted back to raw bytes before it is passed as input to the TensorFlow model.

To resolve this, define a serving function (`serving_fn`) and attach it to the model as a preprocessing step. Add a `@tf.function` decorator so the serving function is fused to the underlying model (instead of upstream on a CPU).

When you send a prediction or explanation request, the content of the request is base 64 decoded into a Tensorflow string (`tf.string`), which is passed to the serving function (`serving_fn`). The serving function preprocesses the `tf.string` into raw (uncompressed) numpy bytes (`preprocess_fn`) to match the input requirements of the model:
- `io.decode_jpeg`- Decompresses the JPG image which is returned as a Tensorflow tensor with three channels (RGB).
- `image.convert_image_dtype` - Changes integer pixel values to float 32.
- `image.resize` - Resizes the image to match the input shape for the model.
- `resized / 255.0` - Rescales (normalization) the pixel data between 0 and 1.

At this point, the data can be passed to the model (`m_call`).

In [None]:
CONCRETE_INPUT = "numpy_inputs"


def _preprocess(bytes_input):
    decoded = tf.io.decode_jpeg(bytes_input, channels=3)
    decoded = tf.image.convert_image_dtype(decoded, tf.float32)
    resized = tf.image.resize(decoded, size=(32, 32))
    rescale = tf.cast(resized / 255.0, tf.float32)
    return rescale


@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def preprocess_fn(bytes_inputs):
    decoded_images = tf.map_fn(
        _preprocess, bytes_inputs, dtype=tf.float32, back_prop=False
    )
    return {
        CONCRETE_INPUT: decoded_images
    }  # User needs to make sure the key matches model's input


m_call = tf.function(model.call).get_concrete_function(
    [tf.TensorSpec(shape=[None, 32, 32, 3], dtype=tf.float32, name=CONCRETE_INPUT)]
)


@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def serving_fn(bytes_inputs):
    images = preprocess_fn(bytes_inputs)
    prob = m_call(**images)
    return prob


tf.saved_model.save(
    model,
    model_path_to_deploy,
    signatures={
        "serving_default": serving_fn,
    },
)

## Get the serving function signature

You can get the signatures of your model's input and output layers by reloading the model into memory, and querying it for the signatures corresponding to each layer.

For your purpose, you need the signature of the serving function. When you send a prediction request as a HTTP request packet, the image data is base64-encoded, and the TensorFlow model takes numpy input. Th AI Platform pre-built prediction container converts the base64-encoded data to image bytes, and your serving function converts image bytes to a numpy array. 

When making a prediction request, you need to route the request to the serving function instead of the model, so you need to know the input layer name of the serving function.

In [None]:
loaded = tf.saved_model.load(model_path_to_deploy)

input_name = list(
    loaded.signatures["serving_default"].structured_input_signature[1].keys()
)[0]
print("Serving function input:", input_name)

### Debugging the serving function

Now that you've downloaded the model with the serving function, you can debug the serving function locally before deploying to a prediction server.

#### Test instance

Make a test instance in the form passed by the HTTP server in the server binary to the serving function.

Start by taking an arbitrary raw (uncompressed) image from our test data (`x_test[0]`). When the request is sent to the prediction server, it will be sent as a non-preprocessed compressed JPEG image.

- Reverse the preprocessing: `(x_test[0] * 255).astype(np.uint8)`
- Compress into JPEG format: `cv2.imwrite('tmp.jpg', raw)`
- Get the raw bytes: `bytes = tf.io.read_file('tmp.jpg')`
- Convert the raw bytes to a tf.string: tf.convert_to_tensor( [bytes], tf.string)

In [None]:
from PIL import Image

raw = (x_test[0] * 255).astype(np.uint8)
image = Image.fromarray(raw)
image.save("tmp.jpg")

bytes = tf.io.read_file("tmp.jpg")
tensor = tf.convert_to_tensor([bytes], tf.string)

#### Call the serving function

Pass the data to the serving function and get the response.

- Get the serving function from the fused model: `loaded.signatures["serving_default"]`
- Run the serving function on the local CPU: `with tf.device("cpu:0")`
- Invoke the serving function: `serving_function(tensor)`

In [None]:
serving_function = loaded.signatures["serving_default"]
with tf.device("cpu:0"):
    response = serving_function(tensor)

Print the response. You will see in the response the probability values for the ten CIFAR10 classes.

In [None]:
print(response)

### Upload the model

Use this helper function `upload_model` to upload your model, stored in SavedModel format, to the model service, which will create an AI Platform `Model` resource. Once you've done that, you can use the model in the same way as any other AI Platform `Model` resource; for example, you can deploying to an `Endpoint` for serving predictions.

The helper function takes the parameters:

- `display_name`: A human readable name for the endpoint.
- `image_uri`: The container image for the model deployment.
- `model_uri`: The Cloud Storage path to our SavedModel artifact. For this tutorial, this is the Cloud Storage location where the `trainer/task.py` script saved the model, which we specified in the variable `MODEL_DIR`.

The helper function uses the model client service's `upload_model` method, which takes the parameters:

- `parent`: The AI Platform location root path for `Dataset`, `Model` and `Endpoint` resources. 
- `model`: The specification for the AI Platform (Unified) `Model` resource.

The `model` paramter is a dictionary object that consists of the following fields:

- `display_name`: A human readable name for the model.
- `metadata_schema_uri`: Since the model was built without a AI Platform (Unified) managed dataset, leave this blank (`''`).
- `artificat_uri`: The Cloud Storage path where the model is stored in SavedModel format. 
- `container_spec`: This is the specification for the Docker container that will run on the `Endpoint` to serve predictions. Use the variable you set earlier: `DEPLOY_GPU != None` to use a GPU; otherwise only a CPU is allocated.

Uploading a model into a AI Platform `Model` resource returns a long-running operation, since it might take a few moments. You call `response.result()`, which is a synchronous call and will return when the AI Platform `Model` resource is ready. 

The helper function returns the AI Platform fully qualified identifier for the corresponding AI Platform model instance `upload_model_response.model`. You will save the identifier for subsequent steps in the variable `model_to_deploy_name`.

In [None]:
IMAGE_URI = DEPLOY_IMAGE


def upload_model(display_name, image_uri, model_uri):
    model = {
        "display_name": display_name,
        "metadata_schema_uri": "",
        "artifact_uri": model_uri,
        "container_spec": {
            "image_uri": image_uri,
            "command": [],
            "args": [],
            "env": [{"name": "env_name", "value": "env_value"}],
            "ports": [{"container_port": 8080}],
            "predict_route": "",
            "health_route": "",
        },
    }
    response = clients["model"].upload_model(parent=PARENT, model=model)
    print("Long running operation:", response.operation.name)
    upload_model_response = response.result(timeout=180)
    print("upload_model_response")
    print(" model:", upload_model_response.model)
    return upload_model_response.model


model_to_deploy_name = upload_model(
    "cifar10-" + TIMESTAMP, IMAGE_URI, model_path_to_deploy
)

### List all models

Now that your custom model is uploaded as an AI Platform `Model`, get a list of all your `Model` resources. Use the following helper function, `list_models`. This helper function calls the AI Platform model client service's `list_models` method, with the following parameter:

- `parent`: The AI Platform location root path for your `Dataset`, `Model`, and `Endpoint` resources.

The response object from the call is a list, where each element is a AI Platform `Model`. For each `Model`, you will see a few fields:

- `name`: The AI Platform (Unified) unique identifier for the `Model`.
- `display_name`: The human readable name assigned to the model.
- `create_time`': Timestamp when the model resource was created.
- `update_time`: Timestamp when the model resource was last updated.
- `container`: The container image used for training the model.
- `artifact_uri`': The Cloud Storage location of the model artifact.

In [None]:
def list_models():
    response = clients["model"].list_models(parent=PARENT)
    for model in response:
        print("name", model.name)
        print("display_name", model.display_name)
        print("create_time", model.create_time)
        print("update_time", model.update_time)
        print("container", model.container_spec.image_uri)
        print("artifact_uri", model.artifact_uri)
        print("\n")


list_models()

### Get model information

Get the information for just your `Model`. Use the helper function `get_model`, with the following parameter:

- `name`: The AI Platform unique identifier for the `Model`.

This helper function calls the AI Platform model client service's `get_model` method, with the following parameter:

- `name`: The AI Platform unique identifier for the `Model`.

In [None]:
def get_model(name):
    response = clients["model"].get_model(name=name)
    print(response)


get_model(model_to_deploy_name)

### Create an `Endpoint`

Use the helper function `create_endpoint` to create an `Endpoint` to deploy the `Model` to for serving predictions, with the following parameter:

- `display_name`: A human readable name for the `Endpoint`.

The helper function uses the endpoint client service's `create_endpoint` method, which takes the parameter:

- `display_name`: A human readable name for the `Endpoint`.

Creating an `Endpoint` returns a long-running operation, since it might take a few moments to provision the endpoint for serving. You will call `response.result()`, which is a synchronous call and will return when the `Endpoint` is ready. The helper function returns the AI Platform fully qualified identifier for the `Endpoint` in the `response.name` field.


In [None]:
ENDPOINT_NAME = "cifar10_endpoint-" + TIMESTAMP


def create_endpoint(display_name):
    endpoint = {"display_name": display_name}
    response = clients["endpoint"].create_endpoint(parent=PARENT, endpoint=endpoint)
    print("Long running operation:", response.operation.name)

    result = response.result(timeout=300)
    print("result")
    print(" name:", result.name)
    print(" display_name:", result.display_name)
    print(" description:", result.description)
    print(" labels:", result.labels)
    print(" create_time:", result.create_time)
    print(" update_time:", result.update_time)
    return result.name


endpoint_name = create_endpoint(ENDPOINT_NAME)

### Prediction scaling

You have several choices on scaling the VMs for handling your online prediction requests:

- Single Instance: The online prediction requests are processed on a single VM.
  - Set the minimum (`MIN_NODES`) and maximum (`MAX_NODES`) number of compute instances to one. 


- Manual Scaling: The online prediction requests are split across a fixed number of VMs that you manually specified.
  - Set the minimum (`MIN_NODES`) and maximum (`MAX_NODES`) number of VMs to the same number of nodes. When a `Model` is first deployed to `Endpoint`, the fixed number of compute instances are provisioned and online prediction requests are evenly distributed across them.
  

- Auto Scaling: The online prediction requests are streamed across an initial number of VMs. Based on time and compute tradeoffs, AI Platform might automatically scale your `DeployedModel` to provision and deprovision compute instances.
  - Set the minimum (`MIN_NODES`) to the initial number of VMs and the maximum (`MAX_NODES`) to the maximum number of VMs that you want the service to scale to.
  
The minimum number of VMs corresponds to the field `min_replica_count` and the maximum number of VMs corresponds to the field `max_replica_count`.

In [None]:
MIN_NODES = 1
MAX_NODES = 1

### Deploy model to the endpoint

Use the helper function `deploy_model` to deploy the `Model` to the `Endpoint` you created for serving predictions, with the following parameters:

- `model`: The AI Platform fully qualified identifier of the `Model` to deploy.
- `deployed_model_display_name`: A human readable name for the `DeployedModel`.
- `endpoint`: The AI Platform fully qualified `Endpoint` identifier to deploy the `Model` to.

The helper function calls the endpoint client service's `deploy_model` method, which takes the following parameters:

- `endpoint`: The AI Platform fully qualified `Endpoint` identifier to deploy the `Model` to.
- `deployed_model`: The requirements for deploying the `Model`.
- `traffic_split`: Percent of the `Endpoint`'s traffic that you want to go to this `DeployedModel`. This is specified as a dictionary of one or more key/value pairs.
   - If only one model, then specify as **{ "0": 100 }**, where "0" refers to the new `DeployedModel` and 100 means 100% of the traffic.
   - If there are existing models on the endpoint, for which the traffic will be split, then specify as follows, where `model_id` is the model id of an existing model to the deployed endpoint. The percents must add up to 100.
   
           { "0": percent, model_id: percent, ... }

The `deployed_model` parameter is specified as a Python dictionary with the minimum required fields:

- `model`: The AI Platform fully qualified identifier of the `Model` to deploy.
- `display_name`: A human readable name for the `DeployedModel`.
- `dedicated_resources`: This refers to how many VMs (replicas) are scaled for serving prediction requests.  
  - `machine_spec`: The compute resources to provision for each VM. Use the variable you set earlier: `DEPLOY_GPU != None` to use a GPU; otherwise only a CPU is allocated.  
  - `min_replica_count`: The number of VMs to initially provision, which you set earlier as the variable `MIN_NODES`.
  - `max_replica_count`: The maximum number of VMs to scale to, which you set earlier as the variable `MAX_NODES`.
- `enable_container_logging`: This enables logging of container events, such as execution failures. The default is False. This is typically set to True when debugging the deployment and then set to False when deployed for production.


Configuring the `traffic_split` to split traffic between multiple `DeployedModel` resources might be helpful in a situation like the following: Perhaps you already have a previous version of your model deployed in production as a `DeployedModel` called `v1`. You got better model evaluation on a new model, which you will deploy as `v2`, but you don't know for certain that it is really better until you deploy to production. So in the case of traffic split, you might want to deploy `v2` to the same `Endpoint` as `v1` but only allocate it 10% of the traffic. That way, you can monitor how well it does without disrupting the majority of users -- until you make a final decision.

In [None]:
DEPLOYED_NAME = "cifar10_deployed-" + TIMESTAMP


def deploy_model(
    model, deployed_model_display_name, endpoint, traffic_split={"0": 100}
):

    if DEPLOY_GPU:
        machine_spec = {
            "machine_type": DEPLOY_COMPUTE,
            "accelerator_type": DEPLOY_GPU,
            "accelerator_count": DEPLOY_NGPU,
        }
    else:
        machine_spec = {
            "machine_type": DEPLOY_COMPUTE,
            "accelerator_count": 0,
        }

    deployed_model = {
        "model": model,
        "display_name": deployed_model_display_name,
        "dedicated_resources": {
            "min_replica_count": MIN_NODES,
            "max_replica_count": MAX_NODES,
            "machine_spec": machine_spec,
        },
    }

    response = clients["endpoint"].deploy_model(
        endpoint=endpoint, deployed_model=deployed_model, traffic_split=traffic_split
    )

    print("Long running operation:", response.operation.name)
    result = response.result()
    print("result")
    deployed_model = result.deployed_model
    print(" deployed_model")
    print("  id:", deployed_model.id)
    print("  model:", deployed_model.model)
    print("  display_name:", deployed_model.display_name)
    print("  create_time:", deployed_model.create_time)

    return deployed_model.id


deployed_model_id = deploy_model(model_to_deploy_name, DEPLOYED_NAME, endpoint_name)

### List all endpoints

Get a list of all your `Endpoint` resources. Use the helper function `list_endpoints`. 

The helper function calls the endpoint client service's `list_endpoints` method. The returned response object is a list, with an element for each `Endpoint`. The helper function prints a few example fields for each `Endpoint`:

- `name`: The AI Platform identifier for the `Endpoint`.
- `display_name`: The human readable name you assigned to the `Endpoint`.
- `create_time`: When the `Endpoint` was created.
- `deployed_models`: The `DeployedModel`s and associated information that are deployed to this `Endpoint`.

In [None]:
def list_endpoints():
    response = clients["endpoint"].list_endpoints(parent=PARENT)
    for endpoint in response:
        print("name:", endpoint.name)
        print("display name:", endpoint.display_name)
        print("create_time:", endpoint.create_time)
        print("deployed_models", endpoint.deployed_models)
        print("\n")


list_endpoints()

### Get information on this endpoint

Get informationfor just your endpoint. Use the helper function `get_endpoint`, with the following parameter:

- `name`: The AI Platform unique identifier for the `Endpoint`.

This helper function calls the endpoint client service's `get_endpoint` method, with the following parameter:

- `name`: The AI Platform unique identifier for the managed `Endpoint`.

In [None]:
def get_endpoint(name):
    response = clients["endpoint"].get_endpoint(name=name)
    print(response)


get_endpoint(endpoint_name)

## Make a prediction request

Let's now get a prediction from the `Endpoint`. You will use an arbitrary image out of the test (holdout) portion of the dataset as a test image. 

In [None]:
test_image = x_test[0]
test_label = y_test[0]
print(test_image.shape)

### Prepare the request content
You are going to send the CIFAR10 image as compressed JPG image, instead of the raw uncompressed bytes:

- `cv2.imwrite`: Use openCV to write the uncompressed image to disk as a compressed JPEG image.
- `tf.io.read_file`: Read the compressed JPG images back into memory as raw bytes.
- `base64.b64encode`: Encode the raw bytes into a base64-encoded string.

In [None]:
import base64

from PIL import Image

image = Image.fromarray((test_image * 255).astype(np.uint8))
image.save("tmp.jpg")

bytes = tf.io.read_file("tmp.jpg")
b64str = base64.b64encode(bytes.numpy()).decode("utf-8")

### Send the prediction request

To send a prediction request, use the helper function `predict_image`, which takes the following parameters:

- `image`: The test image data as a numpy array.
- `endpoint`: The AI Platform fully qualified identifier for the `Endpoint`.
- `parameters_dict`: Additional parameters for serving -- in this case, None.

This function calls the prediction client service's `predict` method with the following parameters:

- `endpoint`: The AI Platform fully qualified identifier for the `Endpoint`.
- `instances`: A list of instances (encoded images) to predict.
- `parameters`: Additional parameters for serving -- in this case, None.

To pass the image data to the prediction service, in the previous step you encoded the bytes into base 64 -- which makes the content safe from modification when transmitting binary data over the network. You need to tell the pre-built container that the content has been base64-encoded, so it will decode the on the other end before passing it to your TensorFlow graph. 

Each instance in the prediction request is a dictionary with the following form:

                        {input_name: {'b64': content}}
                        
- `input_name`: the name of the input layer of the underlying model.
- `'b64'`: A key that indicates the content is base64-encoded.
- `content`: The compressed JPEG image bytes as a base64-encoded string.

Since the `predict()` service can take multiple images (instances), you will send your single image as a list of one image. As a final step, you package the instances list into Google's protobuf format; this is what you pass to the `predict()` service.

The `response` object returns a list, where each element in the list corresponds to the corresponding image in the request. You will see in the output for each prediction:

- Confidence level for the prediction (`predictions`), between 0 and 1, for each of the ten classes.

In [None]:
def predict_image(image, endpoint, parameters_dict):
    # The format of each instance should conform to the deployed model's prediction input schema.
    instances_list = [{input_name: {"b64": image}}]
    instances = [json_format.ParseDict(s, Value()) for s in instances_list]

    response = clients["prediction"].predict(
        endpoint=endpoint, instances=instances, parameters=parameters_dict
    )
    print("response")
    print(" deployed_model_id:", response.deployed_model_id)
    predictions = response.predictions
    print("predictions")
    for prediction in predictions:
        # See gs://google-cloud-aiplatform/schema/predict/prediction/classification.yaml for the format of the predictions.
        print(" prediction:", prediction)


predict_image(b64str, endpoint_name, None)

## Undeploy the model

Undeploy your `DeployedModel` from the serving `Endpoint`. Use the helper function `undeploy_model`, which takes the following parameters:

- `deployed_model_id`: The `DeployedModel` identifier returned by the `Endpoint` service when you deployed the `Model` as a `DeployedModel`.
- `endpoint`: The AI Platform fully qualified identifier for the `Endpoint` where the `DeployedModel` is deployed.

This function calls the endpoint client service's `undeploy_model` method, with the following parameters:

- `deployed_model_id`: The `DeployedModel` identifier returned by the `Endpoint` service when you dep
- `endpoint`: The AI Platform fully qualified identifier for the `Endpoint` where the `DeployedModel` is deployed.
- `traffic_split`: How to split traffic among the remaining `DeployedModel`s on the `Endpoint`.

Since this is the only `DeployedModel` on the `Endpoint`, you can leave `traffic_split` empty by setting it to {}.

In [None]:
def undeploy_model(deployed_model_id, endpoint):
    response = clients["endpoint"].undeploy_model(
        endpoint=endpoint, deployed_model_id=deployed_model_id, traffic_split={}
    )
    print(response)


undeploy_model(deployed_model_id, endpoint_name)

# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

- Dataset
- Model
- Endpoint
- Cloud Storage Bucket

In [None]:
delete_dataset = True
delete_model = True
delete_endpoint = True
delete_bucket = True

# Delete the dataset using the AI Platform (Unified) fully qualified identifier for the dataset
try:
    if delete_dataset:
        clients["dataset"].delete_dataset(name=dataset["name"])
except Exception as e:
    print(e)

# Delete the model using the AI Platform (Unified) fully qualified identifier for the model
try:
    if delete_model:
        clients["model"].delete_model(name=model_to_deploy_name)
except Exception as e:
    print(e)

# Delete the endpoint using the AI Platform (Unified) fully qualified identifier for the endpoint
try:
    if delete_endpoint:
        clients["endpoint"].delete_endpoint(name=endpoint_name)
except Exception as e:
    print(e)

if delete_bucket and "BUCKET_NAME" in globals():
    ! gsutil rm -r gs://$BUCKET_NAME

# Collect any unclaimed memory
import gc

gc.collect()