In [1]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<table align="left">
  <td>
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/prediction/custom_prediction_routines/SDK_Pytorch_Custom_Predict.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/community/prediction/custom_prediction_routines/SDK_Pytorch_Custom_Predict.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>
</table>

## Overview

This tutorial demonstrates how to use Vertex AI SDK to build a custom container that uses the Custom Prediction Routine model server to serve a PyTorch model on Vertex AI Predictions.



### Dataset

This tutorial uses R.A. Fisher's Iris dataset, a small dataset that is popular for trying out machine learning techniques. Each instance has four numerical features, which are different measurements of a flower, and a target label that
marks it as one of three types of iris: Iris setosa, Iris versicolour, or Iris virginica.

This tutorial uses [iris dataset](https://archive.ics.uci.edu/ml/datasets/iris).

### Objective

The goal is to:
- Train a model that uses a flower's measurements as input to predict what type of iris it is.
- Save the model.
- Build a custom PyTorch serving container with custom preprocessing using the Custom Prediction Routine feature in the Vertex AI SDK.
- Test the built container locally.
- Upload and deploy custom container to Vertex Prediction.

This tutorial focuses more on deploying this model with Vertex AI than on
the design of the model itself.

### Costs 

This tutorial uses billable components of Google Cloud:

* Vertex AI

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Set up your local development environment

**If you are using Vertex AI Workbench Notebooks**, your environment already meets
all the requirements to run this notebook. You can skip this step.

**Otherwise**, make sure your environment meets this notebook's requirements.
You need the following:

* Docker
* Git
* Google Cloud SDK (gcloud)
* Python 3
* virtualenv
* Jupyter notebook running in a virtual environment with Python 3

The Google Cloud guide to [Setting up a Python development
environment](https://cloud.google.com/python/setup) and the [Jupyter
installation guide](https://jupyter.org/install) provide detailed instructions
for meeting these requirements. The following steps provide a condensed set of
instructions:

1. [Install and initialize the Cloud SDK.](https://cloud.google.com/sdk/docs/)

1. [Install Python 3.](https://cloud.google.com/python/setup#installing_python)

1. [Install
   virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)
   and create a virtual environment that uses Python 3. Activate the virtual environment.

1. To install Jupyter, run `pip install jupyter` on the
command-line in a terminal shell.

1. To launch Jupyter, run `jupyter notebook` on the command-line in a terminal shell.

1. Open this notebook in the Jupyter Notebook Dashboard.

### Install additional packages

Install additional package dependencies not installed in your notebook environment, such as NumPy, Scikit-learn, FastAPI, Uvicorn, and joblib.

In [67]:

! pip3 install --upgrade --user -q google-cloud-aiplatform
! pip3 install --upgrade --user -q google-cloud-storage
! pip3 install --upgrade --user -q kfp
! pip3 install --upgrade --user -q google-cloud-pipeline-components

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
! python3 -c "import google.cloud.aiplatform; print('aiplatform SDK version: {}'.format(google.cloud.aiplatform.__version__))"

KFP SDK version: 2.3.0
google_cloud_pipeline_components version: 2.4.1
aiplatform SDK version: 1.35.0


### Restart the kernel

After you install the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Before you begin

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

1. [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project).

1. [Enable the Vertex AI API and Compute Engine API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,compute_component).

1. If you are running this notebook locally, you will need to install the [Cloud SDK](https://cloud.google.com/sdk).

1. Enter your project ID in the cell below. Then run the cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` or `%` as shell commands, and it interpolates Python variables with `$` or `{}` into these commands.

#### Set your project ID

**If you don't know your project ID**, you may be able to get your project ID using `gcloud`.

In [1]:
import os

PROJECT_ID = "wortz-project-352116"

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  wortz-project-352116


Otherwise, set your project ID here.

In [2]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

In [3]:
! gcloud config set project $PROJECT_ID

Updated property [core/project].


#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for Vertex AI. We recommend that you choose the region closest to you.

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services.

Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [4]:
REGION = "us-central1"  # @param {type: "string"}

if REGION == "[your-region]":
    REGION = "us-central1"

### Configure project and resource names

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append it onto the name of resources you create in this tutorial.

In [5]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
EXPERIMENT_NAME = f'custom-pipe-pytorch-{TIMESTAMP}'

Configure GCP resource names.

In [6]:
MODEL_ARTIFACT_DIR = "pytorch-cpr-model-sdk"  # @param {type:"string"}
REPOSITORY = "custom-container-prediction-sdk"  # @param {type:"string"}
IMAGE = "pytorch-cpr-server-sdk"  # @param {type:"string"}
MODEL_DISPLAY_NAME = "pytorch-cpr-model-sdk"  # @param {type:"string"}

`REGION` - Used for operations
throughout the rest of this notebook. Make sure to [choose a region where Cloud
Vertex AI services are
available](https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability). You may
not use a Multi-Regional Storage bucket for prediction with Vertex AI.

`MODEL_ARTIFACT_DIR` - Folder directory path to your model artifacts within a Cloud Storage bucket, for example: "my-models/fraud-detection/trial-4"

`REPOSITORY` - Name of the Artifact Repository to create or use.

`IMAGE` - Name of the container image that will be pushed.

`MODEL_DISPLAY_NAME` - Display name of Vertex AI Model resource.

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

To update your model artifacts without re-building the container, you must upload your model
artifacts and any custom code to Cloud Storage.

Set the name of your Cloud Storage bucket below. It must be unique across all
Cloud Storage buckets.

In [40]:
BUCKET_NAME = "pytorch-example-jsw"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"
MODEL_BLOB = "models/custom/model.pt"
MODEL_URI = f"{BUCKET_URI}/models/custom/"

In [8]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "[your-bucket-name]":
    BUCKET_NAME = PROJECT_ID + "-aip-" + TIMESTAMP
    BUCKET_URI = f"gs://{BUCKET_NAME}"

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [9]:
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

Creating gs://pytorch-example-jsw/...
ServiceException: 409 A Cloud Storage bucket named 'pytorch-example-jsw' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


Finally, validate access to your Cloud Storage bucket by examining its contents:

In [10]:
! gsutil ls -al $BUCKET_URI

                                 gs://pytorch-example-jsw//
                                 gs://pytorch-example-jsw/pipeline_root/


### Set up directories and constants

Decide the directory to put your all custom code.

In [11]:
SERVING_APP_DIR = "serving_app"  # @param {type:"string"}

Decide the directory to put your trained models.

In [12]:
# LOCAL_MODEL_ARTIFACTS_DIR = "model_artifacts"  # @param {type:"string"}

In [13]:
TRAINER_DIR = "trainer"

In [14]:
# Training constants
APP_NAME = 'torch-cpr-train-example'
# TRAIN_REPO_NAME=f'{APP_NAME}-train' # Name of repository in which we will store our custom training image
TRAIN_IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{APP_NAME}-train:latest"

In [15]:
SERVER_IMAGE = "pytorch-custom-prediction"  # @param {type:"string"} 
SERVING_IMAGE_URI=f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{SERVER_IMAGE}"

In [16]:
PIPELINE_ROOT_PATH = f'gs://{BUCKET_NAME}/pipeline_root'

PIPELINE_JSON_SPEC_LOCAL = "custom_pipeline_spec.json"

In [17]:
%mkdir $SERVING_APP_DIR
# %mkdir $LOCAL_MODEL_ARTIFACTS_DIR
%mkdir $TRAINER_DIR

mkdir: cannot create directory ‘serving_app’: File exists
mkdir: cannot create directory ‘trainer’: File exists


### Configure the artifact registry

Configure Docker to access Artifact Registry. Then push your container image to your Artifact Registry repository after we build the trainer

In [28]:
# !gcloud services list

If `artifactregistry.googleapis.com` is not enabled in your project, enable the API before proceeding.

In [29]:
!gcloud services enable artifactregistry.googleapis.com

[1;31mERROR:[0m (gcloud.services.enable) PERMISSION_DENIED: Permission denied to enable service [artifactregistry.googleapis.com]
Help Token: AVzH8v1JABU_5AWsULciBElpAw0ajBAQ195xH0Lxh5wyhJhQtIuDQgQK3OPqVpmCvug_SHma4W1dAMyFcGXhxJKNqNmpjeAa2OZqOjswpG6IAys5
- '@type': type.googleapis.com/google.rpc.PreconditionFailure
  violations:
  - subject: ?error_code=110002&service=serviceusage.googleapis.com&permission=serviceusage.services.enable&resource=wortz-project-352116
    type: googleapis.com
- '@type': type.googleapis.com/google.rpc.ErrorInfo
  domain: serviceusage.googleapis.com
  metadata:
    permission: serviceusage.services.enable
    resource: wortz-project-352116
    service: serviceusage.googleapis.com
  reason: AUTH_PERMISSION_DENIED


In [30]:
!gcloud artifacts repositories create {REPOSITORY} \
    --repository-format=docker \
    --location=$REGION

[1;31mERROR:[0m (gcloud.artifacts.repositories.create) ALREADY_EXISTS: the repository already exists


In [31]:
!gcloud auth configure-docker {REGION}-docker.pkg.dev --quiet


{
  "credHelpers": {
    "gcr.io": "gcloud",
    "us.gcr.io": "gcloud",
    "eu.gcr.io": "gcloud",
    "asia.gcr.io": "gcloud",
    "staging-k8s.gcr.io": "gcloud",
    "marketplace.gcr.io": "gcloud",
    "us-central1-docker.pkg.dev": "gcloud"
  }
}
Adding credentials for: us-central1-docker.pkg.dev
gcloud credential helpers already registered correctly.


## Create train.py for pytorch

In [18]:
%%writefile $TRAINER_DIR/train.py

import argparse
from urllib.request import urlretrieve
import torch
from torch.autograd import Variable
import pandas as pd
from google.cloud import storage

def train(args):

    DATA_DIR = "."

    LOCAL_DATA_FILE = f"{DATA_DIR}/iris.csv"

    urlretrieve(
        "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
        LOCAL_DATA_FILE,
    )

    ### Build a PyTorch NN Classifier
    print("PyTorch Version: {}".format(torch.__version__))


    CLASS_VOCAB = ["setosa", "versicolor", "virginica"]


    # Step 1. Load data
    # In this step, we are going to:

    # Load the data to Pandas Dataframe.
    # Convert the class feature (species) from string to a numeric indicator.
    # Split the Dataframe into input feature (xtrain) and target feature (ytrain).

    datatrain = pd.read_csv(
        LOCAL_DATA_FILE,
        names=["sepal_length", "sepal_width", "petal_length", "petal_width", "species"],
    )

    # change string value to numeric
    datatrain.loc[datatrain["species"] == "Iris-setosa", "species"] = 0
    datatrain.loc[datatrain["species"] == "Iris-versicolor", "species"] = 1
    datatrain.loc[datatrain["species"] == "Iris-virginica", "species"] = 2
    datatrain = datatrain.apply(pd.to_numeric)

    # change dataframe to array
    datatrain_array = datatrain.values

    # split x and y (feature and target)
    xtrain = datatrain_array[:, :4]
    ytrain = datatrain_array[:, 4]

    input_features = xtrain.shape[1]
    num_classes = len(CLASS_VOCAB)

    print("Records loaded: {}".format(len(xtrain)))
    print("Number of input features: {}".format(input_features))
    print("Number of classes: {}".format(num_classes))


    # Step 2. Set model parameters
    # You can try different values for hidden_units or learning_rate.

    HIDDEN_UNITS = 10
    LEARNING_RATE = 0.1

    # Step 3. Define the PyTorch NN model
    # Here, we build a a neural network with one hidden layer, and a Softmax output layer for classification.

    model = torch.nn.Sequential(
        torch.nn.Linear(input_features, HIDDEN_UNITS),
        torch.nn.Sigmoid(),
        torch.nn.Linear(HIDDEN_UNITS, num_classes),
        torch.nn.Softmax(),
    )

    loss_metric = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)

    # Step 4. Train the model
    # We are going to train the model for num_epoch epochs.

    NUM_EPOCHS = 10000

    for epoch in range(NUM_EPOCHS):

        x = Variable(torch.Tensor(xtrain).float())
        y = Variable(torch.Tensor(ytrain).long())
        optimizer.zero_grad()
        y_pred = model(x)
        loss = loss_metric(y_pred, y)
        loss.backward()
        optimizer.step()
        if (epoch) % 1000 == 0:
            print(
                "Epoch [{}/{}] Loss: {}".format(
                    epoch + 1, NUM_EPOCHS, round(loss.item(), 3)
                )
            )

    print("Epoch [{}/{}] Loss: {}".format(epoch + 1, NUM_EPOCHS, round(loss.item(), 3)))
    
    # Save the model to GCS

    storage_client = storage.Client(args.model_bucket)
    bucket = storage_client.bucket(args.model_bucket)
    blob = bucket.blob(args.model_blob)
    with blob.open("wb", ignore_flush=True) as f:
        torch.save(model, f)
        
    return model

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Pytorch Iris model')
    parser.add_argument('--project_id', type=str, help='GCP Project ID')
    parser.add_argument('--model_bucket', type=str, help='Model Bucket Name')
    parser.add_argument('--model_blob', type=str, help='Model Blob Path')
    
    args = parser.parse_args()
    
    train(args) #will return a torch model artifact

Overwriting trainer/train.py


## Build a custom serving container using the CPR model server

Now that the model and processor has been trained and saved, it's time to build the custom serving container. Typically building a serving container requires writing model server code. However, with the Custom Prediction Routine feature, Vertex AI Prediction provides a model server that can be used out of the box.

A custom serving container contains the following 3 pieces of code:
1. [Model Server](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/prediction/model_server.py)
    * HTTP server that hosts the model.
    * Responsible for setting up routes/ports/etc.
1. [Request Handler](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/prediction/handler.py)
    * Responsible for webserver aspects of handling a request, such as deserializing the request body, serializing the response, setting response headers, etc.
    * In this example, we will use the default Handler, `google.cloud.aiplatform.prediction.handler.PredictionHandler` provided in the SDK.
1. [Predictor](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/prediction/predictor.py)
    * Responsible for the ML logic for processing a prediction request.

Each of these three pieces can be customized based on the requirements of the custom container. In this example, we will only be implementing the `Predictor`.


A [`Predictor`](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/prediction/predictor.py) must implement the following interface:

```
from abc import ABC, abstractmethod
from typing import Any


class Predictor(ABC):
    """Interface of the Predictor class for Custom Prediction Routines.
    The Predictor is responsible for the ML logic for processing a prediction request.
    Specifically, the Predictor must define:
    (1) How to load all model artifacts used during prediction into memory.
    (2) The logic that should be executed at predict time.
    When using the default PredictionHandler, the Predictor will be invoked as follows:
      predictor.postprocess(predictor.predict(predictor.preprocess(prediction_input)))
    """

    def __init__(self):
        return

    @abstractmethod
    def load(self, artifacts_uri: str) -> None:
        """Loads the model artifact.
        Args:
            artifacts_uri (str):
                Required. The value of the environment variable AIP_STORAGE_URI.
        """
        pass

    def preprocess(self, prediction_input: Any) -> Any:
        """Preprocesses the prediction input before doing the prediction.
        Args:
            prediction_input (Any):
                Required. The prediction input that needs to be preprocessed.
        Returns:
            The preprocessed prediction input.
        """
        return prediction_input

    @abstractmethod
    def predict(self, instances: Any) -> Any:
        """Performs prediction.
        Args:
            instances (Any):
                Required. The instance(s) used for performing prediction.
        Returns:
            Prediction results.
        """
        pass

    def postprocess(self, prediction_results: Any) -> Any:
        """Postprocesses the prediction results.
        Args:
            prediction_results (Any):
                Required. The prediction results.
        Returns:
            The postprocessed prediction results.
        """
        return prediction_results
```

First, implement a custom `Predictor` that loads in both the preprocesor and the model. The preprocessor and the model will then be used at `predict` time.

Custom Prediction Routine supports a way to run the containers locally for testing your images. You can pass either a GCS path or a local path while testing your images locally.
- You need to set up the credentials if you pass a GCS path. 
- You need to support loading your models remotely and locally in your `Predictor` if you want to testing by passing a local path.

Vertex SDK provides a [function `download_model_artifacts`](https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform/utils/prediction_utils.py) to help you download model artifacts from either GCS paths or local paths. See the example in the `load` function below.

In [19]:
%%writefile $SERVING_APP_DIR/predictor.py

import pandas as pd
import pickle
import torch
from typing import Dict

from google.cloud.aiplatform.prediction.predictor import Predictor
from google.cloud.aiplatform.utils import prediction_utils


class CustomPyTorchPredictor(Predictor):
    
    def __init__(self):
        self._class_names = ["setosa", "versicolor", "virginica"]
    
    def load(self, artifacts_uri: str):
        """Loads the model artifacts."""
        prediction_utils.download_model_artifacts(artifacts_uri)

        self._model = torch.load("model.pt")

    def preprocess(self, prediction_input: Dict) -> torch.Tensor:
        instances = prediction_input["instances"]
        data = pd.DataFrame(instances).values
        return torch.Tensor(data)

    @torch.inference_mode()
    def predict(self, instances: torch.Tensor) -> torch.Tensor:
        """Performs prediction."""
        outputs = self._model(instances)
        _ , predicted = torch.max(outputs, 1)
        return predicted

    def postprocess(self, prediction_results: torch.Tensor) -> Dict:
        return {"predictions": [self._class_names[class_num] for class_num in prediction_results]}

Overwriting serving_app/predictor.py


To build a custom container, we also need to write an entrypoint of the image that starts the model server. However, with the Custom Prediction Routine feature, you don't need to write the entrypoint anymore. Vertex SDK will populate the entrypoint with the custom predictor you provide.

Write the dependencies to the source directory which will be installed in the image.

#### We write requirements - note we will be creating our train image in our pwd, the custom predictor will be in SERVING_APP_DIR

In [20]:
%%writefile requirements.txt
fastapi
uvicorn
pandas
torch
google-cloud-storage
google-cloud-aiplatform[prediction]

Overwriting requirements.txt


In [21]:
!cp requirements.txt $SERVING_APP_DIR/requirements.txt

### GCP Endpoint Boilerplate Predictor `main.py`

In [22]:
%%writefile $SERVING_APP_DIR/main.py


from fastapi import FastAPI, Request

import json
import numpy as np
import os
import logging


from google.cloud import storage
from predictor import Predictor

app = FastAPI()

predictor_instance = Predictor()
loaded_predictor = predictor_instance.load(artifacts_uri = os.environ['AIP_STORAGE_URI'])

@app.get(os.environ['AIP_HEALTH_ROUTE'], status_code=200)
def health():
    return {}


@app.post(os.environ['AIP_PREDICT_ROUTE'])
async def predict(request: Request):
    body = await request.json()
    instances = body["instances"]
    outputs = loaded_predictor.predict(instances)

    return {"predictions": outputs}

Overwriting serving_app/main.py


In [23]:
%%writefile $SERVING_APP_DIR/prestart.sh
#!/bin/bash
export PORT=$AIP_HTTP_PORT

Overwriting serving_app/prestart.sh


In [24]:
SERVING_APP_DIR

'serving_app'

In [25]:
#make it a package
!touch src_dir_pytorch/__init__.py

In [26]:
%%writefile $SERVING_APP_DIR/Dockerfile

FROM us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13.py310:latest
WORKDIR .

COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt

COPY . /app
EXPOSE 80
    
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port $AIP_HTTP_PORT"]

Overwriting serving_app/Dockerfile


#### Create the remote serving image

In [27]:
!docker build -t $SERVING_IMAGE_URI $SERVING_APP_DIR/. -q
# !gcloud builds submit -t $REMOTE_IMAGE_NAME .

sha256:c35981d51458e93ed290d78d23772d8b542aeadf6772970034c06d1bb76c95ef


#### Push the serving image to the artifact repo

In [28]:
! docker push $REMOTE_IMAGE_NAME -q

"docker push" requires exactly 1 argument.
See 'docker push --help'.

Usage:  docker push [OPTIONS] NAME[:TAG]

Push an image or a repository to a registry


## Create the Dockerfile for our custom training container

The [Dockerfile](https://docs.docker.com/engine/reference/builder/) specifies how to build our custom container image.

This Dockerfile specifies that we want to:
1. Use Vertex AI [prebuilt container for custom training](https://cloud.google.com/vertex-ai/docs/training/pre-built-containers) as a base image.
2. Install the required dependencied specified in our requirements.txt file.
3. Copy our custom training script to the container image.
4. Run our custom training script when the container starts up.

In [29]:
%%writefile Dockerfile

# Use an official Python runtime as a parent image
FROM us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13.py310:latest

WORKDIR /

COPY requirements.txt /requirements.txt

# Install any needed packages specified in requirements.txt
RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt

# Copies the trainer code to the Docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.train"]

Overwriting Dockerfile


### Build our custom training image

The steps required to build our image are:

1. Change directory to our application directory.
2. Build Docker image.
3. Push the image to our Google Artifact Registry.
4. Change directory back to our parent application directory.

In [30]:
! docker build ./ -t $TRAIN_IMAGE_URI --quiet

sha256:db156c342b4d93441a79f8e276ef28e0776c7f0051684ea25757c0b5235fa95c


#### Push the train image to the artifact repo

In [31]:
! docker push $TRAIN_IMAGE_URI -q

us-central1-docker.pkg.dev/wortz-project-352116/custom-container-prediction-sdk/torch-cpr-train-example-train:latest


#### Now training and serving images are available on the artifact repo

### Create the pipeline for training and endpoint deployment

In [32]:
### Import libs
# Kubeflow Pipelines (KFP)
import kfp
from kfp import compiler, dsl
from kfp.dsl import component, Input, Output, Artifact

# Google Cloud Pipeline Components (GCPC)
from google_cloud_pipeline_components.v1 import dataset, custom_job
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp

VERSION = "v1"
PIPELINE_NAME = f"{VERSION}-custom-prediciton"

In [33]:
TRAINING_ARGS = TRAINING_ARGS=[
        "--project_id",
        PROJECT_ID,
        "--model_bucket",
        BUCKET_NAME,
        "--model_blob",
        MODEL_BLOB
]

WORKER_POOL_SPEC = [
    {
        "machine_spec": {
            "machine_type": "n1-standard-4",
        },
        "replica_count": 1,
        "container_spec": {
            "image_uri": TRAIN_IMAGE_URI,
            "args": TRAINING_ARGS
        },
    }
]

In [34]:
MODEL_NAME = "custom_iris_pytorch"
ENDPOINT_NAME = f"{MODEL_NAME}_endpoint"

In [41]:
@kfp.dsl.pipeline(name=PIPELINE_NAME
                  , description="MLOps pipeline for pytorch with cpr models"
                  , pipeline_root=PIPELINE_ROOT_PATH)
def pipeline(
    bucket_name: str = BUCKET_URI,
    display_name: str = PIPELINE_NAME,
    model_path: str = MODEL_URI,
    project_id: str = PROJECT_ID,
    model_name: str = MODEL_NAME,
    location: str = REGION,
    worker_pool_specs: list = WORKER_POOL_SPEC,
    serving_image_uri: str = SERVING_IMAGE_URI,
    endpoint_name: str = ENDPOINT_NAME
):

    # Train model
    model_training_op = custom_job.CustomTrainingJobOp(
        project=project_id,
        location=location,
        display_name="train-mlops-model",
        worker_pool_specs = worker_pool_specs,
    )
    
    
    importer_op = dsl.importer(
        artifact_uri=model_path,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": serving_image_uri,
            },
        },
    ).after(model_training_op)

    model_upload_op = ModelUploadOp(
        project=project_id,
        display_name=model_name,
        unmanaged_container_model=importer_op.outputs["artifact"],
    ).after(importer_op)

    endpoint_create_op = EndpointCreateOp(
        project=project_id,
        display_name=endpoint_name,
    ).after(model_upload_op)

    model_deploy_op = ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=model_name,
        dedicated_resources_machine_type="n1-standard-4",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    ).after(endpoint_create_op)

#### Initialize the SDK

In [42]:
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [43]:
#### Compile the pipeline

# ! rm -f custom_container_pipeline_spec.json


! rm -f $PIPELINE_JSON_SPEC_LOCAL

kfp.compiler.Compiler().compile(
    pipeline_func=pipeline, package_path=PIPELINE_JSON_SPEC_LOCAL,
)

In [44]:
PIPELINES_FILEPATH = f'{PIPELINE_ROOT_PATH}/pipeline_spec.json'
print("PIPELINES_FILEPATH:", PIPELINES_FILEPATH)

!gsutil -q cp $PIPELINE_JSON_SPEC_LOCAL $PIPELINES_FILEPATH

PIPELINES_FILEPATH: gs://pytorch-example-jsw/pipeline_root/pipeline_spec.json


In [46]:
pipeline = aiplatform.PipelineJob(display_name=PIPELINE_NAME,
                                  template_path=PIPELINES_FILEPATH,
                                  pipeline_root=f'{PIPELINE_ROOT_PATH}',
                                  enable_caching=True
                                 )

pipeline.submit(experiment=EXPERIMENT_NAME)

Creating PipelineJob
PipelineJob created. Resource name: projects/679926387543/locations/us-central1/pipelineJobs/v1-custom-prediciton-20231012223552
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/679926387543/locations/us-central1/pipelineJobs/v1-custom-prediciton-20231012223552')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/v1-custom-prediciton-20231012223552?project=679926387543
Associating projects/679926387543/locations/us-central1/pipelineJobs/v1-custom-prediciton-20231012223552 to Experiment: custom-pipe-pytorch-20231012221837


## Send predictions

### Using Python SDK

In [None]:
endpoint.predict(instances=[[6.7, 3.1, 4.7, 1.5], [4.6, 3.1, 1.5, 0.2]])

### Using REST

In [None]:
ENDPOINT_ID = endpoint.name

In [None]:
! curl \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
-d @instances.json \
https://{REGION}-aiplatform.googleapis.com/v1/projects/{PROJECT_ID}/locations/{REGION}/endpoints/{ENDPOINT_ID}:predict

### Using gcloud CLI

In [None]:
!gcloud ai endpoints predict $ENDPOINT_ID \
  --region=$REGION \
  --json-request=instances.json

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

In [None]:
# Undeploy model and delete endpoint
endpoint.delete(force=True)

# Delete the model resource
model.delete()

# Delete the container image from Artifact Registry
!gcloud artifacts docker images delete \
    --quiet \
    --delete-tags \
    {REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{IMAGE}
delete_bucket = False

if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_URI