In [36]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Custom training with pre-built Google Cloud Pipeline Components

<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/official/pipelines/custom_model_training_and_batch_prediction.ipynb">
      Open in Vertex AI Workbench
    </a>
  </td>
</table>
<br/><br/><br/>

## Overview


This tutorial demonstrates how to use Vertex AI Pipelines with pre-built Google Cloud Pipeline Components for custom training.

### Dataset

The dataset used for this tutorial is the [CIFAR10 dataset](https://www.tensorflow.org/datasets/catalog/cifar10) from [TensorFlow Datasets](https://www.tensorflow.org/datasets/catalog/overview). The version of the dataset you will use is built into TensorFlow. The trained model predicts which type of class an image is from ten classes: airplane, automobile, bird, cat, deer, dog, frog, horse, ship, or truck.

### Objective

In this tutorial, you create a custom image classification model using Vertex AI Pipelines with pre-built Google Cloud Pipeline Components for custom training.

The steps performed include:

- Train a custom model.
- Upload the trained model as a `Model` resource.
- Create an `Endpoint` resource.
- Deploy the `Model` resource to the `Endpoint` resource.

Learn more about [Google Cloud Pipeline Components](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline).

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing) and [Cloud Storage
pricing](https://cloud.google.com/storage/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Set up your local development environment

If you are using Colab or Google Cloud Notebook, your environment already meets all the requirements to run this notebook. You can skip this step.

Otherwise, make sure your environment meets this notebook's requirements. You need the following:

- The Cloud Storage SDK
- Git
- Python 3
- virtualenv
- Jupyter notebook running in a virtual environment with Python 3

The Cloud Storage guide to [Setting up a Python development environment](https://cloud.google.com/python/setup) and the [Jupyter installation guide](https://jupyter.org/install) provide detailed instructions for meeting these requirements. The following steps provide a condensed set of instructions:

1. [Install and initialize the SDK](https://cloud.google.com/sdk/docs/).

2. [Install Python 3](https://cloud.google.com/python/setup#installing_python).

3. [Install virtualenv](Ihttps://cloud.google.com/python/setup#installing_and_using_virtualenv) and create a virtual environment that uses Python 3.

4. Activate that environment and run `pip3 install Jupyter` in a terminal shell to install Jupyter.

5. Run `jupyter notebook` on the command line in a terminal shell to launch Jupyter.

6. Open this notebook in the Jupyter Notebook Dashboard.


## Installation

Install the latest version of Vertex AI SDK for Python.

In [None]:
import os

# Google Cloud Notebook
if os.path.exists("/opt/deeplearning/metadata/env_version") or os.getenv("IS_TESTING"):
    USER_FLAG = "--user"
else:
    USER_FLAG = ""

! pip3 install --upgrade google-cloud-aiplatform $USER_FLAG

Install the latest GA version of *google-cloud-storage* library as well.

In [None]:
! pip3 install -U google-cloud-storage $USER_FLAG

Install the latest GA version of *google-cloud-pipeline-components* library as well.

In [None]:
! pip3 install kfp google-cloud-pipeline-components --upgrade $USER_FLAG

In [None]:
if os.getenv("IS_TESTING"):
    ! pip3 install --upgrade --force-reinstall $USER_FLAG tensorflow==2.5 kfp google-cloud-aiplatform google-cloud-storage google-cloud-pipeline-components

### Restart the kernel

Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin

### GPU runtime

This tutorial does not require a GPU runtime.

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project.](https://cloud.google.com/billing/docs/how-to/modify-project)

3. [Enable the Vertex AI APIs, Compute Engine APIs, and Cloud Storage.](https://console.cloud.google.com/flows/enableapi?apiid=ml.googleapis.com,compute_component,storage-component.googleapis.com)

4. [The Google Cloud SDK](https://cloud.google.com/sdk) is already installed in Google Cloud Notebook.

5. Enter your project ID in the cell below. Then run the  cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$`.

In [16]:
PROJECT_ID = "argolis-demo-project"  # @param {type:"string"}

In [3]:
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID:", PROJECT_ID)

In [17]:
print(PROJECT_ID)

argolis-demo-project


In [None]:
!gcloud config set project {PROJECT_ID}

#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for Vertex AI. We recommend that you choose the region closest to you.

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services.

Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [18]:
REGION = "us-central1"  # @param {type: "string"}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, create a timestamp for each instance session, and append the timestamp onto the name of resources you create in this tutorial.

In [19]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Authenticate your Google Cloud account

**If you are using Google Cloud Notebook**, your environment is already authenticated. Skip this step.

**If you are using Colab**, run the cell below and follow the instructions when prompted to authenticate your account via oAuth.

**Otherwise**, follow these steps:

In the Cloud Console, go to the [Create service account key](https://console.cloud.google.com/apis/credentials/serviceaccountkey) page.

**Click Create service account**.

In the **Service account name** field, enter a name, and click **Create**.

In the **Grant this service account access to project** section, click the Role drop-down list. Type "Vertex" into the filter box, and select **Vertex Administrator**. Type "Storage Object Admin" into the filter box, and select **Storage Object Admin**.

Click Create. A JSON file that contains your key downloads to your local environment.

Enter the path to your service account key as the GOOGLE_APPLICATION_CREDENTIALS variable in the cell below and run the cell.

In [None]:
# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

import os
import sys

# If on Google Cloud Notebook, then don't execute this code
if not os.path.exists("/opt/deeplearning/metadata/env_version"):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you initialize the Vertex AI SDK for Python, you specify a Cloud Storage staging bucket. The staging bucket is where all the data associated with your dataset and model resources are retained across sessions.

Set the name of your Cloud Storage bucket below. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.

In [20]:
BUCKET_NAME = "argolis-demo-senchan"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [None]:
if BUCKET_URI == "" or BUCKET_URI is None or BUCKET_URI == "gs://[your-bucket-name]":
    BUCKET_URI = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l $REGION $BUCKET_URI

Finally, validate access to your Cloud Storage bucket by examining its contents:

In [None]:
! gsutil ls -al $BUCKET_URI

#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

In [None]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [None]:
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your GCP project id from gcloud
    shell_output = !gcloud auth list 2>/dev/null
    SERVICE_ACCOUNT = shell_output[2].strip()
    print("Service Account:", SERVICE_ACCOUNT)

#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [None]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [21]:
import google.cloud.aiplatform as aip

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

In [33]:
PIPELINE_ROOT = "{}/pipeline_root/tf_cifar".format(BUCKET_URI)

Additional imports.

In [9]:
!pip3 install tensorflow --user

Collecting tensorflow
  Downloading tensorflow-2.9.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (511.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m511.7/511.7 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting tensorflow-estimator<2.10.0,>=2.9.0rc0
  Downloading tensorflow_estimator-2.9.0-py2.py3-none-any.whl (438 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m438.7/438.7 kB[0m [31m42.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting astunparse>=1.6.0
  Downloading astunparse-1.6.3-py2.py3-none-any.whl (12 kB)
Collecting h5py>=2.9.0
  Downloading h5py-3.7.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (4.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.1/4.1 MB[0m [31m88.2 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25hCollecting libclang>=13.0.0
  Downloading libclang-14.0.1-py2.py3-none-manylinux1_x86_64.whl (14.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [22]:
import tensorflow as tf
from google_cloud_pipeline_components.experimental.custom_job import utils
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [23]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

#### Set hardware accelerators

You can set hardware accelerators for training and prediction.

Set the variables `TRAIN_GPU/TRAIN_NGPU` and `DEPLOY_GPU/DEPLOY_NGPU` to use a container image supporting a GPU and the number of GPUs allocated to the virtual machine (VM) instance. For example, to use a GPU container image with 4 Nvidia Telsa K80 GPUs allocated to each VM, you would specify:

    (aip.AcceleratorType.NVIDIA_TESLA_K80, 4)


Otherwise specify `(None, None)` to use a container image to run on a CPU.

Learn more about [ hardware accelerator support for your region](https://cloud.google.com/vertex-ai/docs/general/locations#accelerators).

*Note*: TF releases before 2.3 for GPU support will fail to load the custom model in this tutorial. It is a known issue and fixed in TF 2.3. This is caused by static graph ops that are generated in the serving function. If you encounter this issue on your own custom models, use a container image for TF 2.3 with GPU support.

In [25]:
import os

if os.getenv("IS_TESTING_TRAIN_GPU"):
    TRAIN_GPU, TRAIN_NGPU = (
        aip.gapic.AcceleratorType.NVIDIA_TESLA_K80,
        int(os.getenv("IS_TESTING_TRAIN_GPU")),
    )
else:
    TRAIN_GPU, TRAIN_NGPU = (None, None)

if os.getenv("IS_TESTING_DEPLOY_GPU"):
    DEPLOY_GPU, DEPLOY_NGPU = (
        aip.gapic.AcceleratorType.NVIDIA_TESLA_K80,
        int(os.getenv("IS_TESTING_DEPLOY_GPU")),
    )
else:
    DEPLOY_GPU, DEPLOY_NGPU = (None, None)

#### Set pre-built containers

Set the pre-built Docker container image for training and prediction.


For the latest list, see [Pre-built containers for training](https://cloud.google.com/ai-platform-unified/docs/training/pre-built-containers).


For the latest list, see [Pre-built containers for prediction](https://cloud.google.com/ai-platform-unified/docs/predictions/pre-built-containers).

In [26]:
if os.getenv("IS_TESTING_TF"):
    TF = os.getenv("IS_TESTING_TF")
else:
    TF = "2-1"

if TF[0] == "2":
    if TRAIN_GPU:
        TRAIN_VERSION = "tf-gpu.{}".format(TF)
    else:
        TRAIN_VERSION = "tf-cpu.{}".format(TF)
    if DEPLOY_GPU:
        DEPLOY_VERSION = "tf2-gpu.{}".format(TF)
    else:
        DEPLOY_VERSION = "tf2-cpu.{}".format(TF)
else:
    if TRAIN_GPU:
        TRAIN_VERSION = "tf-gpu.{}".format(TF)
    else:
        TRAIN_VERSION = "tf-cpu.{}".format(TF)
    if DEPLOY_GPU:
        DEPLOY_VERSION = "tf-gpu.{}".format(TF)
    else:
        DEPLOY_VERSION = "tf-cpu.{}".format(TF)

TRAIN_IMAGE = "gcr.io/cloud-aiplatform/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "gcr.io/cloud-aiplatform/prediction/{}:latest".format(DEPLOY_VERSION)

print("Training:", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)
print("Deployment:", DEPLOY_IMAGE, DEPLOY_GPU, DEPLOY_NGPU)

Training: gcr.io/cloud-aiplatform/training/tf-cpu.2-1:latest None None
Deployment: gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-1:latest None None


#### Set machine type

Next, set the machine type to use for training and prediction.

- Set the variables `TRAIN_COMPUTE` and `DEPLOY_COMPUTE` to configure  the compute resources for the VMs you will use for for training and prediction.
 - `machine type`
     - `n1-standard`: 3.75GB of memory per vCPU.
     - `n1-highmem`: 6.5GB of memory per vCPU
     - `n1-highcpu`: 0.9 GB of memory per vCPU
 - `vCPUs`: number of \[2, 4, 8, 16, 32, 64, 96 \]

*Note: The following is not supported for training:*

 - `standard`: 2 vCPUs
 - `highcpu`: 2, 4 and 8 vCPUs

*Note: You may also use n2 and e2 machine types for training and deployment, but they do not support GPUs*.

In [27]:
if os.getenv("IS_TESTING_TRAIN_MACHINE"):
    MACHINE_TYPE = os.getenv("IS_TESTING_TRAIN_MACHINE")
else:
    MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

if os.getenv("IS_TESTING_DEPLOY_MACHINE"):
    MACHINE_TYPE = os.getenv("IS_TESTING_DEPLOY_MACHINE")
else:
    MACHINE_TYPE = "n1-standard"

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type n1-standard-4
Deploy machine type n1-standard-4


# Tutorial

Now you are ready to start creating your own custom model and training for CIFAR10.

### Examine the training package

#### Package layout

Before you start the training, you will look at how a Python package is assembled for a custom training job. When unarchived, the package contains the following directory/file layout.

- PKG-INFO
- README.md
- setup.cfg
- setup.py
- trainer
  - \_\_init\_\_.py
  - task.py

The files `setup.cfg` and `setup.py` are the instructions for installing the package into the operating environment of the Docker image.

The file `trainer/task.py` is the Python script for executing the custom training job. *Note*, when we referred to it in the worker pool specification, we replace the directory slash with a dot (`trainer.task`) and dropped the file suffix (`.py`).

#### Package Assembly

In the following cells, you will assemble the training package.

In [28]:
# Make folder for Python training script
! rm -rf custom
! mkdir custom

# Add package information
! touch custom/README.md

setup_cfg = "[egg_info]\n\ntag_build =\n\ntag_date = 0"
! echo "$setup_cfg" > custom/setup.cfg

setup_py = "import setuptools\n\nsetuptools.setup(\n\n    install_requires=[\n\n        'tensorflow_datasets==1.3.0',\n\n    ],\n\n    packages=setuptools.find_packages())"
! echo "$setup_py" > custom/setup.py

pkg_info = "Metadata-Version: 1.0\n\nName: CIFAR10 image classification\n\nVersion: 0.0.0\n\nSummary: Demostration training script\n\nHome-page: www.google.com\n\nAuthor: Google\n\nAuthor-email: aferlitsch@google.com\n\nLicense: Public\n\nDescription: Demo\n\nPlatform: Vertex"
! echo "$pkg_info" > custom/PKG-INFO

# Make the training subfolder
! mkdir custom/trainer
! touch custom/trainer/__init__.py

### Create a custom component for training the custom model

Next, create a lightweight Python function component for training the CIFAR10 image classification model.

In [29]:
# Single, Mirror and Multi-Machine Distributed Training for CIFAR-10


@component(
    base_image="tensorflow/tensorflow:latest",
    packages_to_install=["tensorflow_datasets", "opencv-python-headless"],
)
def custom_train_model(
    model_dir: str,
    lr: float = 0.01,
    epochs: int = 10,
    steps: int = 200,
    distribute: str = "single",
):

    import faulthandler
    import os
    import sys

    import tensorflow as tf
    import tensorflow_datasets as tfds
    from tensorflow.python.client import device_lib

    faulthandler.enable()
    tfds.disable_progress_bar()

    print("Component start")

    print("Python Version = {}".format(sys.version))
    print("TensorFlow Version = {}".format(tf.__version__))
    print("TF_CONFIG = {}".format(os.environ.get("TF_CONFIG", "Not found")))
    print("DEVICES", device_lib.list_local_devices())

    # Single Machine, single compute device
    if distribute == "single":
        if tf.test.is_gpu_available():
            strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
        else:
            strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
    # Single Machine, multiple compute device
    elif distribute == "mirror":
        strategy = tf.distribute.MirroredStrategy()
    # Multiple Machine, multiple compute device
    elif distribute == "multi":
        strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

    # Multi-worker configuration
    print("num_replicas_in_sync = {}".format(strategy.num_replicas_in_sync))

    # Preparing dataset
    BUFFER_SIZE = 10000
    BATCH_SIZE = 64

    def make_datasets_unbatched():

        # Scaling CIFAR10 data from (0, 255] to (0., 1.]
        def scale(image, label):
            image = tf.cast(image, tf.float32)
            image /= 255.0
            return image, label

        datasets, info = tfds.load(name="cifar10", with_info=True, as_supervised=True)
        return datasets["train"].map(scale).cache().shuffle(BUFFER_SIZE).repeat()

    # Build the Keras model
    def build_and_compile_cnn_model(lr: int = 0.01):
        model = tf.keras.Sequential(
            [
                tf.keras.layers.Conv2D(
                    32, 3, activation="relu", input_shape=(32, 32, 3)
                ),
                tf.keras.layers.MaxPooling2D(),
                tf.keras.layers.Conv2D(32, 3, activation="relu"),
                tf.keras.layers.MaxPooling2D(),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(10, activation="softmax"),
            ]
        )
        model.compile(
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            optimizer=tf.keras.optimizers.SGD(learning_rate=lr),
            metrics=["accuracy"],
        )
        return model

    # Train the model
    NUM_WORKERS = strategy.num_replicas_in_sync
    # Here the batch size scales up by number of workers since
    # `tf.data.Dataset.batch` expects the global batch size.
    GLOBAL_BATCH_SIZE = BATCH_SIZE * NUM_WORKERS
    train_dataset = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE)

    with strategy.scope():
        # Creation of dataset, and model building/compiling need to be within
        # `strategy.scope()`.
        model = build_and_compile_cnn_model(lr)

    model.fit(x=train_dataset, epochs=epochs, steps_per_epoch=steps)
    model.save(model_dir)

    model_path_to_deploy = model_dir

    # Load the saved model
    local_model = tf.keras.models.load_model(model_dir)

    # Load evaluation data
    import numpy as np
    from tensorflow.keras.datasets import cifar10

    (_, _), (x_test, y_test) = cifar10.load_data()
    x_test = (x_test / 255.0).astype(np.float32)

    print(x_test.shape, y_test.shape)

    # Perform the model evaluation
    local_model.evaluate(x_test, y_test)

    # Serving function for image data
    CONCRETE_INPUT = "numpy_inputs"

    def _preprocess(bytes_input):
        decoded = tf.io.decode_jpeg(bytes_input, channels=3)
        decoded = tf.image.convert_image_dtype(decoded, tf.float32)
        resized = tf.image.resize(decoded, size=(32, 32))
        return resized

    @tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
    def preprocess_fn(bytes_inputs):
        decoded_images = tf.map_fn(
            _preprocess, bytes_inputs, dtype=tf.float32, back_prop=False
        )
        return {
            CONCRETE_INPUT: decoded_images
        }  # User needs to make sure the key matches model's input

    @tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
    def serving_fn(bytes_inputs):
        images = preprocess_fn(bytes_inputs)
        prob = m_call(**images)
        return prob

    m_call = tf.function(local_model.call).get_concrete_function(
        [tf.TensorSpec(shape=[None, 32, 32, 3], dtype=tf.float32, name=CONCRETE_INPUT)]
    )

    tf.saved_model.save(
        local_model, model_path_to_deploy, signatures={"serving_default": serving_fn}
    )

    # Get the serving function signature
    loaded = tf.saved_model.load(model_path_to_deploy)

    serving_input = list(
        loaded.signatures["serving_default"].structured_input_signature[1].keys()
    )[0]
    print("Serving function input:", serving_input)

    # Get test items
    test_image_1 = x_test[0]
    test_image_2 = x_test[1]
    print(test_image_1.shape)

    BUCKET_URI = model_dir + "/test"

    import cv2

    cv2.imwrite("tmp1.jpg", (test_image_1 * 255).astype(np.uint8))
    cv2.imwrite("tmp2.jpg", (test_image_2 * 255).astype(np.uint8))

    print("Writing jpg files")

    # Copy test item(s)
    # For the batch prediction, copy the test items over to your Cloud Storage bucket.
    test_item_1 = BUCKET_URI + "/tmp1.jpg"
    test_item_2 = BUCKET_URI + "/tmp2.jpg"

    with tf.io.gfile.GFile(test_item_1, "wb") as w:
        with tf.io.gfile.GFile("tmp1.jpg", "rb") as r:
            bytes = r.read()
            w.write(bytes)

    with tf.io.gfile.GFile(test_item_2, "wb") as w:
        with tf.io.gfile.GFile("tmp2.jpg", "rb") as r:
            bytes = r.read()
            w.write(bytes)

    # Make the batch input file
    import base64
    import json

    gcs_input_uri = BUCKET_URI + "/" + "test.jsonl"
    with tf.io.gfile.GFile(gcs_input_uri, "w") as f:
        bytes = tf.io.read_file(test_item_1)
        b64str = base64.b64encode(bytes.numpy()).decode("utf-8")
        data = {serving_input: {"b64": b64str}}
        f.write(json.dumps(data) + "\n")
        bytes = tf.io.read_file(test_item_2)
        b64str = base64.b64encode(bytes.numpy()).decode("utf-8")
        data = {serving_input: {"b64": b64str}}
        f.write(json.dumps(data) + "\n")

### Convert the component to a Vertex AI Custom Job

Next, use the `create_custom_training_job_op_from_component` method to convert the custom component into a Vertex AI Custom Job pre-built component.

In [30]:
custom_job_distributed_training_op = utils.create_custom_training_job_op_from_component(
    custom_train_model, replica_count=1
)

### Define the pipeline for the custom training job

Next, define the pipeline job, consisting of the tasks:

- Train the custom model.
- Upload the model to a Verex AI Model resource.
- Execute a batch prediction.

In [34]:
MODEL_DIR = BUCKET_URI + "/model"
MODEL_DISPLAY_NAME = f"train_deploy{TIMESTAMP}"


@dsl.pipeline(name="custom-model-training-sample-pipeline")
def pipeline(
    project: str = PROJECT_ID,
    model_display_name: str = MODEL_DISPLAY_NAME,
    model_dir: str = MODEL_DIR,
    lr: float = 0.01,
    epochs: int = 10,
    steps: int = 200,
    distribute: str = "single",
):
    from google_cloud_pipeline_components.types import artifact_types
    from google_cloud_pipeline_components.v1.batch_predict_job import \
        ModelBatchPredictOp
    from google_cloud_pipeline_components.v1.model import ModelUploadOp
#    from google_cloud_pipeline_components import ModelUploadOp, EndpointCreateOp
    from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,
                                                              ModelDeployOp)
    from kfp.v2.components import importer_node

    custom_producer_task = custom_job_distributed_training_op(
        model_dir=model_dir,
        lr=lr,
        epochs=epochs,
        steps=steps,
        distribute=distribute,
        project=PROJECT_ID,
        location=REGION,
        base_output_directory=PIPELINE_ROOT,
    )

    
    unmanaged_model_importer = importer_node.importer(
        artifact_uri=model_dir,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-3:latest"
            }
        },
    )
    

    model_upload_op = ModelUploadOp(
        project=PROJECT_ID,
        display_name="model_display_name",
        unmanaged_container_model=unmanaged_model_importer.outputs["artifact"],
        #unmanaged_container_model="us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-3:latest",
    )
    model_upload_op.after(custom_producer_task)

    '''
    batch_predict_op = ModelBatchPredictOp(
        project=PROJECT_ID,
        job_display_name="batch_predict_job",
        model=model_upload_op.outputs["model"],
        gcs_source_uris=[MODEL_DIR + "/test/test.jsonl"],
        gcs_destination_output_uri_prefix=PIPELINE_ROOT,
        instances_format="jsonl",
        predictions_format="jsonl",
        model_parameters={},
        machine_type=DEPLOY_COMPUTE,
        starting_replica_count=1,
        max_replica_count=1,
    )
    '''
    
    endpoint_create_op = EndpointCreateOp(
        project=project,
        display_name="pipelines-created-endpoint",
        )

    
    model_deploy_op = ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=model_display_name,
        dedicated_resources_machine_type="n1-standard-16",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )

    model_deploy_op.after(model_upload_op)

### Compile and run the pipeline
Next, you compile the pipeline into a DAG and then exeute it.

In [35]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="custom_model_training_spec.json"
)

DISPLAY_NAME = "cifar10_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="custom_model_training_spec.json",
    pipeline_root=PIPELINE_ROOT,
)

#job.run(service_account=SERVICE_ACCOUNT)

job.run(sync=False
       )

! rm custom_model_training_spec.json

Creating PipelineJob
PipelineJob created. Resource name: projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220718045733
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220718045733')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/custom-model-training-sample-pipeline-20220718045733?project=278305018396
PipelineJob projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220718045733 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220718045733 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-202207

### View custom training pipeline results

Finally, you will view the artifact outputs of each task in the pipeline.

In [None]:
import json

PROJECT_NUMBER = job.gca_resource.name.split("/")[1]
print(PROJECT_NUMBER)


def print_pipeline_output(job, output_task_name):
    JOB_ID = job.name
    print(JOB_ID)
    for _ in range(len(job.gca_resource.job_detail.task_details)):
        TASK_ID = job.gca_resource.job_detail.task_details[_].task_id
        EXECUTE_OUTPUT = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/executor_output.json"
        )
        GCP_RESOURCES = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/gcp_resources"
        )
        EVAL_METRICS = (
            PIPELINE_ROOT
            + "/"
            + PROJECT_NUMBER
            + "/"
            + JOB_ID
            + "/"
            + output_task_name
            + "_"
            + str(TASK_ID)
            + "/evaluation_metrics"
        )
        if tf.io.gfile.exists(EXECUTE_OUTPUT):
            ! gsutil cat $EXECUTE_OUTPUT
            return EXECUTE_OUTPUT
        elif tf.io.gfile.exists(GCP_RESOURCES):
            ! gsutil cat $GCP_RESOURCES
            return GCP_RESOURCES
        elif tf.io.gfile.exists(EVAL_METRICS):
            ! gsutil cat $EVAL_METRICS
            return EVAL_METRICS

    return None


print("model-upload")
artifacts = print_pipeline_output(job, "model-upload")
print("\n")
output = !gsutil cat $artifacts
print(output)
output = json.loads(output[0])
model_id = output["artifacts"]["model"]["artifacts"][0]["metadata"]["resourceName"]
print("model-batch-predict")
artifacts = print_pipeline_output(job, "model-batch-predict")
print("\n")
output = !gsutil cat $artifacts
output = json.loads(output[0])
batch_job_id = output["artifacts"]["batchpredictionjob"]["artifacts"][0]["metadata"][
    "resourceName"
]

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial.

In [None]:
job.delete()

model = aip.Model(model_id)
model.delete()

batch_job = aip.BatchPredictionJob(batch_job_id)
batch_job.delete()

# uncomment to delete your bucket
# ! gsutil rm -rf {BUCKET_URI}

In [None]:
!pip3 install tensorflow_datasets --user

Collecting tensorflow_datasets
  Downloading tensorflow_datasets-4.6.0-py3-none-any.whl (4.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.3/4.3 MB[0m [31m41.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting etils[epath]
  Downloading etils-0.6.0-py3-none-any.whl (98 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.1/98.1 kB[0m [31m14.0 MB/s[0m eta [36m0:00:00[0m
Collecting promise
  Downloading promise-2.3.tar.gz (19 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting tensorflow-metadata
  Downloading tensorflow_metadata-1.9.0-py3-none-any.whl (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.0/51.0 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
Collecting dill
  Downloading dill-0.3.5.1-py2.py3-none-any.whl (95 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m95.8/95.8 kB[0m [31m15.8 MB/s[0m eta [36m0:00:00[0m
Building wheels for collected packages: promise
  Buil

In [47]:
import tensorflow_datasets as tfds
datasets, info = tfds.load(name="cifar10", with_info=True, as_supervised=True)

  from .autonotebook import tqdm as notebook_tqdm


[1mDownloading and preparing dataset 162.17 MiB (download: 162.17 MiB, generated: 132.40 MiB, total: 294.58 MiB) to ~/tensorflow_datasets/cifar10/3.0.2...[0m


Dl Completed...: 0 url [00:00, ? url/s]
Dl Size...: 0 MiB [00:00, ? MiB/s][A

Dl Completed...:   0%|          | 0/1 [00:00<?, ? url/s]
Dl Size...: 0 MiB [00:00, ? MiB/s][A

Dl Completed...:   0%|          | 0/1 [00:01<?, ? url/s]
Dl Size...:   0%|          | 0/162 [00:01<?, ? MiB/s][A

Extraction completed...: 0 file [00:01, ? file/s][A[A
Dl Completed...:   0%|          | 0/1 [00:01<?, ? url/s] MiB][A
Dl Size...:   1%|          | 1/162 [00:01<03:25,  1.28s/ MiB][A

Dl Completed...:   0%|          | 0/1 [00:01<?, ? url/s]
Dl Size...:   1%|          | 2/162 [00:01<03:24,  1.28s/ MiB][A

Dl Completed...:   0%|          | 0/1 [00:01<?, ? url/s]
Dl Size...:   2%|▏         | 3/162 [00:01<03:22,  1.28s/ MiB][A

Dl Completed...:   0%|          | 0/1 [00:01<?, ? url/s]
Dl Size...:   2%|▏         | 4/162 [00:01<03:21,  1.28s/ MiB][A

Dl Completed...:   0%|          | 0/1 [00:01<?, ? url/s]
Dl Size...:   3%|▎         | 5/162 [00:01<03:20,  1.28s/ MiB][A

Extraction completed...: 0 file

PipelineJob projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220717020133 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220717020133 current state:
PipelineState.PIPELINE_STATE_RUNNING

Generating train examples...:  40%|████      | 20193/50000 [00:11<00:19, 1542.46 examples/s][A
Generating train examples...:  41%|████      | 20370/50000 [00:11<00:18, 1605.71 examples/s][A
Generating train examples...:  41%|████      | 20546/50000 [00:11<00:17, 1649.27 examples/s][A
Generating train examples...:  41%|████▏     | 20723/50000 [00:12<00:17, 1682.37 examples/s][A
Generating train examples...:  42%|████▏     | 20896/50000 [00:12<00:17, 1693.92 examples/s][A
Generating train examples...:  42%|████▏     | 21075/50000 [00:12<00:16, 1720.33 examples/s][A
Generating train examples...:  43%|████▎     | 21253/50000 [00:12<00:16, 1735.63 examples/s][A
Generating train examples...:  43%|████▎     | 21431/50000 [00:12<00:16, 1746.59 examples/s][A
Generating tra

[1mDataset cifar10 downloaded and prepared to ~/tensorflow_datasets/cifar10/3.0.2. Subsequent calls will reuse this data.[0m


In [50]:
from tensorflow.keras.datasets import cifar10

(_, _), (x_test, y_test) = cifar10.load_data()
x_test = (x_test / 255.0).astype(np.float32)

print(x_test.shape, y_test.shape)

# Perform the model evaluation
local_model.evaluate(x_test, y_test)

# Serving function for image data
CONCRETE_INPUT = "numpy_inputs"

def _preprocess(bytes_input):
    decoded = tf.io.decode_jpeg(bytes_input, channels=3)
    decoded = tf.image.convert_image_dtype(decoded, tf.float32)
    resized = tf.image.resize(decoded, size=(32, 32))
    return resized

@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def preprocess_fn(bytes_inputs):
    decoded_images = tf.map_fn(
        _preprocess, bytes_inputs, dtype=tf.float32, back_prop=False
    )
    return {
        CONCRETE_INPUT: decoded_images
    }  # User needs to make sure the key matches model's input

@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def serving_fn(bytes_inputs):
    images = preprocess_fn(bytes_inputs)
    prob = m_call(**images)
    return prob

m_call = tf.function(local_model.call).get_concrete_function(
    [tf.TensorSpec(shape=[None, 32, 32, 3], dtype=tf.float32, name=CONCRETE_INPUT)]
)

TypeError: 'PrefetchDataset' object is not subscriptable

In [53]:
from tensorflow.keras.datasets import cifar10
import numpy as np

In [54]:
(_, _), (x_test, y_test) = cifar10.load_data()
x_test = (x_test / 255.0).astype(np.float32)

In [61]:
x_test[0]

(32, 32, 3)

PipelineJob projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220717020133 current state:
PipelineState.PIPELINE_STATE_RUNNING


INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/278305018396/locations/us-central1/pipelineJobs/custom-model-training-sample-pipeline-20220717020133 current state:
PipelineState.PIPELINE_STATE_RUNNING


In [68]:
import json

In [67]:
type(x_test[0])

numpy.ndarray

NameError: name 'decode' is not defined

In [93]:
tf.image.decode_jpeg(loaded_data[0][0][0].shape, channels=3)

TypeError: Cannot convert (32, 32, 3) to EagerTensor of dtype string

In [83]:
loaded_data = cifar10.load_data()

(32, 32, 3)

In [70]:
json.dumps(x_test[0].tolist())

'[[[0.6196078658103943, 0.43921568989753723, 0.1921568661928177], [0.6235294342041016, 0.43529412150382996, 0.18431372940540314], [0.6470588445663452, 0.45490196347236633, 0.20000000298023224], [0.6509804129600525, 0.4627451002597809, 0.2078431397676468], [0.6274510025978088, 0.43921568989753723, 0.18039216101169586], [0.6117647290229797, 0.4274509847164154, 0.16078431904315948], [0.6352941393852234, 0.45098039507865906, 0.18431372940540314], [0.6235294342041016, 0.4431372582912445, 0.1764705926179886], [0.6196078658103943, 0.43529412150382996, 0.1725490242242813], [0.6235294342041016, 0.4431372582912445, 0.16078431904315948], [0.6313725709915161, 0.45490196347236633, 0.16078431904315948], [0.6274510025978088, 0.43529412150382996, 0.20392157137393951], [0.6313725709915161, 0.43529412150382996, 0.1921568661928177], [0.6509804129600525, 0.4588235318660736, 0.16078431904315948], [0.6627451181411743, 0.4588235318660736, 0.1764705926179886], [0.6666666865348816, 0.46666666865348816, 0.17254

In [71]:
REGION='us-central1'

In [72]:
ENDPOINT_NAME="pipelines-created-endpoint"
#instance = [[3,0,18,0,0,8.6625,0,0,1]]
ENDPOINT_ID = !(gcloud ai endpoints list --region=$REGION \
              --format='value(ENDPOINT_ID)' \
              --filter=display_name=$ENDPOINT_NAME \
              --sort-by=creationTimeStamp | tail -1)
ENDPOINT_ID = ENDPOINT_ID[1]
print(ENDPOINT_ID)

8550039912097775616


In [73]:
instance = json.dumps(x_test[0].tolist())

In [75]:
from google.cloud import aiplatform

In [81]:
!cat ~/input.json

{
  "instances": [
    {
      "data":
        "iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAIAAAD8GO2jAAAAB3RJTUUH2AoYFB0lQuPgxAAAB/ZJREFUSImVVlmPplURruWcd+ttpnt6mZ4JdsMwC0OUxAXcgka58Xf4O/wnmsi1N8QQ3EJMBA1ookNUZJh9pqe3r7/9Pe97tiovPiBR4cK6q0rqqarnpE49+OOf/FwksMHClsZaa5kIAJTIWGuZGQFSTEFFM+Q+uaRdiDknEWXIIfg+SMqQBVUg5jRzLgQfg/cp9yLG2FIyISmSISJEYkYkRCBEVFUFIGNQNQMIE+REJABQlmbzwpL33cNHJ4pGowJjBiBiYkNGETJmIcPEbJgsISEiAgACExHRwlNVAFEARRJiQCRCRMw5A8jm5vmqssxEzAAAAERIhITEzMxskBIDEBljmBmJEUAVgBAXCUxIGgu2WYQIiYmVRCTnfHJyOhgAALFhEUkxSc6AiIilIYk5oVJVs7HABlQFEYmYiEABERdDWKbVxpJ6wkwozMS8IBNVbOdUhRa9ICIiMrNlXl9tGks1g9n/0vrpYOpcIlOJKjIQMSEQLQbAqjAltdK3ZXU+MomKKBITKaOQNRxSJlJmQEOsZASt4YqgKcuMbK7vra4tmydH45AxCUkWUGUmIhBkjdlSpjSvDAoxMCcJiYQJgVlQVNUYQ4pIWSHX1rDQfDQ3wlVZz5M3qzWWl9bWVuuDo9F0FoCKJJSTEBKyBYAkXvsuBi4ay4qsZJTAEoAAKBtSAQaqyWyfr1fr8vxK9frPft+snt945gY6MAVl1bSzbrfPX37w4GTURuG673OM6lXJUFnX81HyEY0qEBVFSczeZwRRyIAIxKrAhNYYYo25G89O1y5eqBuyUzVLdanOqfaVtS9cuXg4mI3mCZar0cSNughEOefJ1K1s7gmRiIgqIhZFwaRIklJSwRhzH2Xug+Re

In [77]:
def endpoint_predict(
    project: str, location: str, instances: list, endpoint: str
):
    aiplatform.init(project=project, location=location)

    endpoint = aiplatform.Endpoint(endpoint)

    prediction = endpoint.predict(instances=instances)
    return prediction

endpoint_predict(PROJECT_ID, REGION, instance, ENDPOINT_ID)

InvalidArgument: 400 {
    "error": "Expected image (JPEG, PNG, or GIF), got unknown format starting with '['\n\t [[{{node map/while/DecodeJpeg}}]]"
}

INPUT_DATA_FILE='~/input2.json'

In [100]:
json_data = json.load(~/input2.json)

SyntaxError: invalid syntax (1071349511.py, line 1)

In [96]:
!curl -X POST --http2 -H "Authorization: Bearer $(gcloud auth print-access-token)" -H "Content-Type: application/json" https://us-central1-aiplatform.googleapis.com/v1/projects/${PROJECT_ID}/locations/us-central1/endpoints/${ENDPOINT_ID}:predict -d "@${INPUT_DATA_FILE}"

E0717 02:49:38.028815270    2312 backup_poller.cc:136]       Run client channel backup poller: {"created":"@1658026178.028539218","description":"pollset_work","file":"src/core/lib/iomgr/ev_epoll1_linux.cc","file_line":247,"referenced_errors":[{"created":"@1658026178.028530788","description":"Bad file descriptor","errno":9,"file":"src/core/lib/iomgr/ev_epoll1_linux.cc","file_line":732,"os_error":"Bad file descriptor","syscall":"epoll_wait"}]}
{
  "error": {
    "code": 403,
    "message": "Permission denied on resource project -demo-project.",
    "status": "PERMISSION_DENIED",
    "details": [
      {
        "@type": "type.googleapis.com/google.rpc.Help",
        "links": [
          {
            "description": "Google developer console API key",
            "url": "https://console.developers.google.com/project/-demo-project/apiui/credential"
          }
        ]
      },
      {
        "@type": "type.googleapis.com/google.rpc.ErrorInfo",
        "reason": "CONSUMER_INVALID",
     