In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

<!-- <table align="left">

  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/notebook_template.ipynb"">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> 在 Colab 上运行
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/ai-platform-samples/blob/master/ai-platform-unified/notebooks/notebook_template.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      在 GitHub 上查看
    </a>
  </td>
</table> -->

使用Vertex AI Pipelines协调ML工作流程，训练和部署PyTorch文本分类模型。

## 概述

此笔记本是对[之前的笔记本](./pytorch-text-classification-vertex-ai-train-tune-deploy.ipynb)的扩展，用于微调和部署[HuggingFace Hub](https://huggingface.co/bert-base-cased)上的预训练的BERT模型，以进行情感分类任务。此笔记本展示了如何通过在无服务器方式下使用[Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)编排流水线来自动化和监控基于PyTorch的ML工作流程。

该笔记本使用[Kubeflow Pipelines v2 (`kfp.v2`) SDK](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/)定义了一个流水线，并将该流水线提交到Vertex AI Pipelines服务中。

### 数据集

该笔记本使用来自[Hugging Face Datasets](https://huggingface.co/datasets)的[IMDB电影评论数据集](https://huggingface.co/datasets/imdb)。

### 目标

如何在[Vertex AI](https://cloud.google.com/vertex-ai)上编排PyTorch ML工作流程，并强调对在Vertex AI上训练、部署和编排PyTorch工作流程提供一流支持。

### 目录

此笔记本涵盖以下部分：

---
- [构建流水线的高级流程](#High-Level-Flow-of-Building-a-Pipeline): 理解流水线概念和流水线示意图
- [定义流水线组件](#Define-the-Pipeline-Components-for-PyTorch-based-ML-Workflow): 为基于PyTorch的ML工作流程编写自定义流水线组件
- [定义流水线规范](#Define-Pipeline-Specification): 使用KFP v2 SDK为基于PyTorch的ML工作流程编写流水线规范
- [提交流水线](#Submit-Pipeline): 在Vertex AI Pipelines上编译和执行流水线
- [监控流水线](#Monitoring-the-Pipeline): 监控流水线的进度，并查看日志、谱系、工件和流水线运行
---

### 成本

本教程使用Google Cloud Platform (GCP) 的计费组件：

* [Vertex AI Workbench](https://cloud.google.com/vertex-ai-workbench)
* [Vertex AI Training](https://cloud.google.com/vertex-ai/docs/training/custom-training)
* [Vertex AI Predictions](https://cloud.google.com/vertex-ai/docs/predictions/getting-predictions)
* [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)
* [Cloud Storage](https://cloud.google.com/storage)
* [Container Registry](https://cloud.google.com/container-registry)
* [Cloud Build](https://cloud.google.com/build) *[可选]*

了解[Vertex AI Pricing](https://cloud.google.com/vertex-ai/pricing)、[Cloud Storage Pricing](https://cloud.google.com/storage/pricing)和[Cloud Build Pricing](https://cloud.google.com/build/pricing)以及使用[定价计算器](https://cloud.google.com/products/calculator/)根据您的预期使用生成费用估计。

***
**注意：** 此笔记本不需要GPU运行时。但是，您必须拥有足够的GPU配额来运行由流水线启动的带有GPU的作业。请检查[配额](https://console.cloud.google.com/iam-admin/quotas)页面，确保您的项目中有足够的GPU可用。如果GPU未在配额页面列出，或者您需要额外的GPU配额，请[请求增加配额](https://cloud.google.com/compute/quotas#requesting_additional_quota)。免费试用账户默认不会收到GPU配额。

### 设置您的本地开发环境

**如果您正在使用Colab或Google云笔记本**，您的环境已经符合运行此笔记本的所有要求。您可以跳过这一步。

否则，请确保您的环境符合此笔记本的要求。
您需要以下内容：

* Google Cloud SDK
* Git
* Python 3
* virtualenv
* 在使用Python 3的虚拟环境中运行的Jupyter笔记本

Google Cloud的[设置Python开发环境指南](https://cloud.google.com/python/setup)和[Jupyter安装指南](https://jupyter.org/install)提供了详细的说明以满足这些要求。以下步骤提供了一套简明的说明：

1. [安装并初始化Cloud SDK。](https://cloud.google.com/sdk/docs/)
2. [安装Python 3。](https://cloud.google.com/python/setup#installing_python)
3. [安装virtualenv](https://cloud.google.com/python/setup#installing_and_using_virtualenv)并创建一个使用Python 3的虚拟环境。激活虚拟环境。
4. 要安装Jupyter，请在终端shell中运行`pip3 install jupyter`命令。
5. 要启动Jupyter，请在终端shell中运行`jupyter notebook`命令。
6. 在Jupyter Notebook Dashboard中打开此笔记本。

### 安装额外的包

以下是此笔记本所需的 Python 依赖项，将在笔记本实例中安装。

- [Kubeflow Pipelines v2 SDK](https://pypi.org/project/kfp/)
- [Google Cloud Pipeline Components](https://pypi.org/project/google-cloud-pipeline-components/) 
- [Vertex AI SDK for Python](https://pypi.org/project/google-cloud-aiplatform/) 

---
该笔记本已使用以下版本的 Kubeflow Pipelines SDK 和 Google Cloud Pipeline Components 进行测试

```
kfp 版本: 1.8.10
google_cloud_pipeline_components 版本: 0.2.2
```
---

In [None]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

In [None]:
!pip -q install {USER_FLAG} --upgrade kfp
!pip -q install {USER_FLAG} --upgrade google-cloud-pipeline-components

安装Python的Vertex AI SDK。

这个笔记本使用[Python的Vertex AI SDK](https://cloud.google.com/vertex-ai/docs/start/client-libraries#python)与Vertex AI服务进行交互。这个高级别的`google-cloud-aiplatform`库旨在通过使用包装类和明确的默认值来简化常见的数据科学工作流程。

In [None]:
!pip -q install {USER_FLAG} --upgrade google-cloud-aiplatform

### 重新启动内核

在安装完附加软件包后，您需要重新启动笔记本内核，以便它可以找到这些软件包。

In [None]:
# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

请检查您安装的软件包版本。KFP SDK 版本应该大于等于1.6。

In [None]:
!python3 -c "import kfp; print('kfp version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

在开始之前

这本笔记本不需要GPU运行时。

### 设置您的Google Cloud项目

**无论您的笔记本环境如何，都需要按照以下步骤操作。**

1. [选择或创建一个Google Cloud项目](https://console.cloud.google.com/cloud-resource-manager)。当您首次创建账号时，您将获得$300的免费信用额，可用于支付计算/存储成本。
1. [确保您的项目已启用计费](https://cloud.google.com/billing/docs/how-to/modify-project)。
1. 在您的项目中启用以下运行本教程所需的API
    - [Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com)
    - [Cloud Storage API](https://console.cloud.google.com/flows/enableapi?apiid=storage.googleapis.com)
    - [Container Registry API](https://console.cloud.google.com/flows/enableapi?apiid=containerregistry.googleapis.com)
    - [Cloud Build API](https://console.cloud.google.com/flows/enableapi?apiid=cloudbuild.googleapis.com)
1. 如果您在本地运行此笔记本，您需要安装[Cloud SDK](https://cloud.google.com/sdk)。
1. 在下面的单元格中输入您的项目ID。然后运行该单元格，以确保Cloud SDK在本笔记本中的所有命令中使用正确的项目。

**注意**：Jupyter在以`!`前缀开头的行中运行shell命令，并将以`$`前缀开头的Python变量插入这些命令中。

#### 设置您的项目ID

**如果您不知道您的项目ID**，您可以使用`gcloud`来获取您的项目ID。

In [None]:
PROJECT_ID = "[your-project-id]"  # <---CHANGE THIS TO YOUR PROJECT

In [None]:
import os

# Get your Google Cloud project ID using google.auth
if not os.getenv("IS_TESTING"):
    import google.auth

    _, PROJECT_ID = google.auth.default()
    print("Project ID: ", PROJECT_ID)

# validate PROJECT_ID
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    print(
        f"Please set your project id before proceeding to next step. Currently it's set as {PROJECT_ID}"
    )

时间戳

如果您正在参加实时教程会话，您可能正在使用共享的测试账号或项目。为了避免用户之间在创建的资源上发生名称冲突，您可以为每个实例会话创建一个时间戳，并将其附加到您在本教程中创建的资源的名称上。

In [None]:
from datetime import datetime


def get_timestamp():
    return datetime.now().strftime("%Y%m%d%H%M%S")


TIMESTAMP = get_timestamp()
print(f"TIMESTAMP = {TIMESTAMP}")

### 验证您的谷歌云账户

---

**如果您正在使用谷歌云笔记本**，则您的环境已经经过验证。请跳过此步骤。

如果您正在使用Colab，请运行下面的单元格并按照提示进行oAuth身份验证。

否则，请按照以下步骤操作：

1. 在Cloud Console中，转到[**创建服务帐号密钥**页面](https://console.cloud.google.com/apis/credentials/serviceaccountkey)。
2. 点击**创建服务帐号**。
3. 在**服务帐号名称**字段中输入一个名称，然后点击**创建**。
4. 在**授予此服务帐号对项目的访问权限**部分，点击**角色**下拉列表。在过滤框中键入“Vertex AI”，然后选择**Vertex AI管理员**。在过滤框中键入“Storage Object Admin”，然后选择**存储对象管理员**。
5. 点击*创建*。包含您密钥的JSON文件将下载到本地环境中。
6. 在下面的单元格中将您的服务帐号密钥路径输入为`GOOGLE_APPLICATION_CREDENTIALS`变量，并运行该单元格。

In [None]:
import os
import sys

# If you are running this notebook in Colab, run this cell and follow the
# instructions to authenticate your GCP account. This provides access to your
# Cloud Storage bucket and lets you submit training jobs and prediction
# requests.

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# If on Google Cloud Notebooks, then don't execute this code
if not IS_GOOGLE_CLOUD_NOTEBOOK:
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

### 创建一个云存储桶

**无论您使用哪种笔记本环境，以下步骤是必需的。**

使用 Cloud SDK 提交训练作业时，您需要将包含训练代码的 Python 包上传到一个云存储桶中。Vertex AI 将从该包中运行代码。在本教程中，Vertex AI 还会将您作业的训练模型保存在同一个云存储桶中。使用此模型工件，您可以创建 Vertex AI 模型和端点资源，以便提供在线预测。

在下方设置您的云存储桶的名称。它必须在所有云存储桶中是唯一的。

您还可以更改 `REGION` 变量，该变量在笔记本的后续操作中使用。请确保 [选择 Vertex AI 服务可用的区域](https://cloud.google.com/vertex-ai/docs/general/locations#available_regions)。您不能使用多区域存储桶进行 Vertex AI 训练。

In [None]:
BUCKET_NAME = "gs://[your-bucket-name]"  # @param {type:"string"}
REGION = "us-central1"  # @param {type:"string"}

In [None]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

只有在您的存储桶尚不存在时，才运行以下单元格以创建您的云存储存储桶。

In [None]:
! gsutil mb -l $REGION $BUCKET_NAME

最后，通过检查云存储桶的内容来验证访问权限。

In [None]:
! gsutil ls -al $BUCKET_NAME

###导入库并定义常量

导入运行管道所需的python库，并定义常量。

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
from typing import NamedTuple

import google_cloud_pipeline_components
import kfp
from google.cloud import aiplatform
from google.cloud.aiplatform import gapic as aip
from google.cloud.aiplatform import pipeline_jobs
from google.protobuf.json_format import MessageToDict
from google_cloud_pipeline_components import aiplatform as aip_components
from google_cloud_pipeline_components.experimental import custom_job
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import Input, Metrics, Model, Output, component

In [None]:
APP_NAME = "finetuned-bert-classifier"

In [None]:
PATH = %env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

# Pipeline root is the GCS path to store the artifacts from the pipeline runs
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/{APP_NAME}"

In [None]:
print(f"Kubeflow Pipelines SDK version = {kfp.__version__}")
print(
    f"Google Cloud Pipeline Components version = {google_cloud_pipeline_components.__version__}"
)
print(f"Pipeline Root = {PIPELINE_ROOT}")

## 构建流水线的高级流程

以下是在 Vertex AI Pipelines 上定义和提交流水线的高级流程：

1. 定义涉及训练和部署 PyTorch 模型的流水线组件
2. 通过将预构建的 [Google Cloud 流水线组件](https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction) 和自定义组件串联起来，定义一个流水线
3. 编译并提交流水线到 Vertex AI Pipelines 服务以运行工作流程
4. 监视流水线并分析生成的指标和工件

![流水线的高级流程](./images/pipelines-high-level-flow.png)

本笔记构建在先前开发的训练和服务代码之上，可在此 [笔记本](../pytorch-text-classification-vertex-ai-train-tune-deploy.ipynb)中找到。

### Pipeline的概念

让我们来看看[Kubeflow Pipelines SDK v2](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/)中使用的术语和概念。

![pipeline的概念](./images/concepts-of-a-pipeline.png)

- **组件：** 组件是在ML工作流中执行单个任务的一组自包含代码。 例如，训练一个模型。组件接口由输入、输出和容器镜像组成，该组件的代码在其中运行 - 包括可执行代码和环境定义。
- **Pipeline：** 流水线由模块化任务组成，这些任务定义为通过输入和输出链接在一起的组件。 Pipeline定义包括运行管道所需的参数等配置。 流水线中的每个组件都是独立执行的，数据（输入和输出）以序列化格式在组件之间传递。
- **输入和输出：** 组件的输入和输出必须用数据类型进行标注，这使得输入或输出成为参数或工件。
    - **参数：** 参数是支持简单数据类型的输入或输出，例如 `str`、`int`、`float`、`bool`、`dict`、`list`。 输入参数始终在组件之间按值传递，并存储在[Vertex ML Metadata](https://cloud.google.com/vertex-ai/docs/ml-metadata/introduction)服务中。
    - **工件：** 工件是流水线运行生成的对象或文件的引用，作为输入或输出传递。 工件支持各种较为复杂或较大的数据类型，如数据集、模型、指标、可视化，这些数据以文件或对象的形式写入。 工件由名称、URI和元数据定义，元数据自动存储在Vertex ML Metadata服务中，工件的实际内容引用云存储桶中的路径。 输入工件始终通过引用传递。

在这里了解更多关于KFP SDK v2的概念。 [此处](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/)。

### 管道示意图

以下是基于PyTorch的文本分类模型管道的高级示意图，包括管道中涉及的任务和输入输出：

![PyTorch文本分类模型的管道示意图](./images/pipeline-schematic-pytorch-text-classification.png)

- **构建自定义训练镜像：** 此步骤从训练应用代码和相关Dockerfile以及依赖项构建自定义训练容器镜像。此步骤的输出是自定义训练容器的容器或工件注册表URI。
- **运行自定义训练任务以训练和评估模型：** 此步骤从HuggingFace上下载和预处理IMDB情感分类数据集的训练数据，然后在前一步的自定义训练容器上训练和评估模型。该步骤的输出是训练模型工件的Cloud Storage路径和模型性能指标。
- **打包模型工件：** 此步骤使用Torch Model Archiver工具打包训练过的模型工件，包括自定义预测处理程序，以创建一个模型存档（.mar）文件。此步骤的输出是GCS上模型存档（.mar）文件的位置。
- **构建自定义服务镜像：** 该步骤构建一个自定义服务容器，运行TorchServe HTTP服务器以为挂载的模型提供预测请求服务。此步骤的输出是自定义服务容器的容器或工件注册表URI。
- **使用自定义服务容器上传模型：** 此步骤使用前述步骤中的自定义服务镜像和MAR文件创建一个模型资源。
- **创建端点：** 此步骤创建一个Vertex AI端点，提供一个服务URL，用于发送预测请求。
- **将模型部署到端点提供服务：** 此步骤将模型部署到创建的端点，创建必要的计算资源（根据配置的机器规格）以提供在线预测请求的服务。
- **验证部署：** 此步骤向端点发送测试请求并验证部署。

## 定义基于PyTorch的ML工作流的管道组件

该管道使用了来自[Google Cloud Pipeline Components SDK](https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction)的预构建组件，与Google Cloud服务如Vertex AI进行交互，并针对管道中的某些步骤定义自定义组件。本笔记本的本节定义了使用[KFP SDK v2组件规范](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/component-development/)执行管道中任务的自定义组件。

将管道目录创建在本地以保存组件和管道规范。

In [None]:
!mkdir -p ./pipelines

### 1. 组件：构建自定义训练容器镜像

这一步使用Cloud Build构建自定义训练容器镜像。构建作业从GCS位置拉取训练应用代码和相关的`Dockerfile`，然后将自定义训练容器镜像构建/推送到容器注册表。

- **输入**：该组件的输入是训练应用代码和Dockerfile的GCS路径。
- **输出**：此步骤的输出是自定义训练容器的容器或Artifact注册表URI。

使用PyTorch GPU镜像作为基础创建`Dockerfile`，安装所需的依赖项并复制训练应用程序代码。

In [None]:
%%writefile ./custom_container/Dockerfile

# Use pytorch GPU base image
# FROM gcr.io/cloud-aiplatform/training/pytorch-gpu.1-7
FROM us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-10:latest

# set working directory
WORKDIR /app

# Install required packages
RUN pip install google-cloud-storage transformers datasets tqdm cloudml-hypertune

# Copies the trainer code to the docker image.
COPY ./trainer/__init__.py /app/trainer/__init__.py
COPY ./trainer/experiment.py /app/trainer/experiment.py
COPY ./trainer/utils.py /app/trainer/utils.py
COPY ./trainer/metadata.py /app/trainer/metadata.py
COPY ./trainer/model.py /app/trainer/model.py
COPY ./trainer/task.py /app/trainer/task.py

# Set up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

将培训应用程序代码和 `Dockerfile` 从本地路径复制到 GCS 位置

In [None]:
# copy training Dockerfile
!gsutil cp ./custom_container/Dockerfile {BUCKET_NAME}/{APP_NAME}/train/

# copy training application code
!gsutil cp -r ./python_package/trainer/ {BUCKET_NAME}/{APP_NAME}/train/

# list copied files from GCS location
!gsutil ls -Rl {BUCKET_NAME}/{APP_NAME}/train/

print(
    f"Copied training application code and Dockerfile to {BUCKET_NAME}/{APP_NAME}/train/"
)

定义自定义管道组件来构建自定义训练容器

In [None]:
@component(
    base_image="gcr.io/google.com/cloudsdktool/cloud-sdk:latest",
    packages_to_install=["google-cloud-build"],
    output_component_file="./pipelines/build_custom_train_image.yaml",
)
def build_custom_train_image(
    project: str, gs_train_src_path: str, training_image_uri: str
) -> NamedTuple("Outputs", [("training_image_uri", str)]):
    """custom pipeline component to build custom training image using
    Cloud Build and the training application code and dependencies
    defined in the Dockerfile
    """

    import logging
    import os

    from google.cloud.devtools import cloudbuild_v1 as cloudbuild
    from google.protobuf.duration_pb2 import Duration

    # initialize client for cloud build
    logging.getLogger().setLevel(logging.INFO)
    build_client = cloudbuild.services.cloud_build.CloudBuildClient()

    # parse step inputs to get path to Dockerfile and training application code
    gs_dockerfile_path = os.path.join(gs_train_src_path, "Dockerfile")
    gs_train_src_path = os.path.join(gs_train_src_path, "trainer/")

    logging.info(f"training_image_uri: {training_image_uri}")

    # define build steps to pull the training code and Dockerfile
    # and build/push the custom training container image
    build = cloudbuild.Build()
    build.steps = [
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", "-r", gs_train_src_path, "."],
        },
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", gs_dockerfile_path, "Dockerfile"],
        },
        # enabling Kaniko cache in a Docker build that caches intermediate
        # layers and pushes image automatically to Container Registry
        # https://cloud.google.com/build/docs/kaniko-cache
        {
            "name": "gcr.io/kaniko-project/executor:latest",
            "args": [f"--destination={training_image_uri}", "--cache=true"],
        },
    ]
    # override default timeout of 10min
    timeout = Duration()
    timeout.seconds = 7200
    build.timeout = timeout

    # create build
    operation = build_client.create_build(project_id=project, build=build)
    logging.info("IN PROGRESS:")
    logging.info(operation.metadata)

    # get build status
    result = operation.result()
    logging.info("RESULT:", result.status)

    # return step outputs
    return (training_image_uri,)

关于组件规范，有几点需要注意：
- 定义的独立函数会被转换为一个流水线组件，使用[`@kfp.v2.dsl.component`](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/v2/components/component_decorator.py) 装饰器。
- 所有独立函数中的参数必须有数据类型注释，因为KFP使用函数的输入和输出来定义组件的接口。
- 默认情况下，Python 3.7被用作基础镜像来运行定义的代码。您可以[配置`@component`装饰器](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/python-function-components/#building-python-function-based-components) 来覆盖默认镜像，通过指定`base_image`来安装额外的python包使用`packages_to_install`参数，并使用`output_component_file`将编译后的组件文件写成一个YAML文件以供共享或重用组件。

### 2. 组件：从Vertex AI获取定制培训作业详情

此步骤从Vertex AI获取定制培训作业的详情，包括培训经过的时间，模型性能指标，这些将在模型部署前的下一步中使用。该步骤还创建了一个[Model](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/v2/components/types/artifact_types.py#L77)工件，其中包含培训模型的工件。

**注意：**管道中使用的预构建[custom job component](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.1/google_cloud_pipeline_components.experimental.custom_job.html)输出CustomJob资源，但不包括模型工件。

- **输入**：
    - **`job_resource`：** 由预构建的CustomJob组件返回的自定义作业资源
    - **`project`：** 作业运行的项目ID
    - **`region`：** 作业运行的区域
    - **`eval_metric_key`：** 评估指标键名，例如eval_accuracy
    - **`model_display_name`：** 保存模型工件的模型显示名称

- **输出**： 
    - **`model`：** 通过培训作业创建的具有附加模型元数据的训练模型工件
    - **`metrics`：** 从培训作业中捕获的模型性能指标

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=[
        "google-cloud-pipeline-components",
        "google-cloud-aiplatform",
        "pandas",
        "fsspec",
    ],
    output_component_file="./pipelines/get_training_job_details.yaml",
)
def get_training_job_details(
    project: str,
    location: str,
    job_resource: str,
    eval_metric_key: str,
    model_display_name: str,
    metrics: Output[Metrics],
    model: Output[Model],
) -> NamedTuple(
    "Outputs", [("eval_metric", float), ("eval_loss", float), ("model_artifacts", str)]
):
    """custom pipeline component to get model artifacts and performance
    metrics from custom training job
    """
    import logging
    import shutil
    from collections import namedtuple

    import pandas as pd
    from google.cloud.aiplatform import gapic as aip
    from google.protobuf.json_format import Parse
    from google_cloud_pipeline_components.proto.gcp_resources_pb2 import \
        GcpResources

    # parse training job resource
    logging.info(f"Custom job resource = {job_resource}")
    training_gcp_resources = Parse(job_resource, GcpResources())
    custom_job_id = training_gcp_resources.resources[0].resource_uri
    custom_job_name = "/".join(custom_job_id.split("/")[-6:])
    logging.info(f"Custom job name parsed = {custom_job_name}")

    # get custom job information
    API_ENDPOINT = "{}-aiplatform.googleapis.com".format(location)
    client_options = {"api_endpoint": API_ENDPOINT}
    job_client = aip.JobServiceClient(client_options=client_options)
    job_resource = job_client.get_custom_job(name=custom_job_name)
    job_base_dir = job_resource.job_spec.base_output_directory.output_uri_prefix
    logging.info(f"Custom job base output directory = {job_base_dir}")

    # copy model artifacts
    logging.info(f"Copying model artifacts to {model.path}")
    destination = shutil.copytree(job_base_dir.replace("gs://", "/gcs/"), model.path)
    logging.info(destination)
    logging.info(f"Model artifacts located at {model.uri}/model/{model_display_name}")
    logging.info(f"Model artifacts located at model.uri = {model.uri}")

    # set model metadata
    start, end = job_resource.start_time, job_resource.end_time
    model.metadata["model_name"] = model_display_name
    model.metadata["framework"] = "pytorch"
    model.metadata["job_name"] = custom_job_name
    model.metadata["time_to_train_in_seconds"] = (end - start).total_seconds()

    # fetch metrics from the training job run
    metrics_uri = f"{model.path}/model/{model_display_name}/all_results.json"
    logging.info(f"Reading and logging metrics from {metrics_uri}")
    metrics_df = pd.read_json(metrics_uri, typ="series")
    for k, v in metrics_df.items():
        logging.info(f"     {k} -> {v}")
        metrics.log_metric(k, v)

    # capture eval metric and log to model metadata
    eval_metric = (
        metrics_df[eval_metric_key] if eval_metric_key in metrics_df.keys() else None
    )
    eval_loss = metrics_df["eval_loss"] if "eval_loss" in metrics_df.keys() else None
    logging.info(f"     {eval_metric_key} -> {eval_metric}")
    logging.info(f'     "eval_loss" -> {eval_loss}')

    model.metadata[eval_metric_key] = eval_metric
    model.metadata["eval_loss"] = eval_loss

    # return output parameters
    outputs = namedtuple("Outputs", ["eval_metric", "eval_loss", "model_artifacts"])

    return outputs(eval_metric, eval_loss, job_base_dir)

### 3. 组件：使用Torch Model Archiver创建模型存档（MAR）文件

此步骤使用[Torch Model Archiver](https://github.com/pytorch/serve/tree/master/model-archiver)工具打包经过训练的模型工件和自定义预测处理程序（在先前的笔记本中定义）作为模型存档（.mar）文件。

- **输入**：
    - **`model_display_name`：** 用于保存模型存档文件的模型显示名称
    - **`model_version`：** 用于保存模型存档文件的模型版本
    - **`handler`：** 自定义预测处理程序的位置
    - **`model`：** 来自上一步的经过训练的模型工件

- **输出**：
    - **`model_mar`**：GCS上打包的模型存档文件（工件）
    - **`mar_env`**：创建模型资源所需的环境变量列表
    - **`mar_export_uri`**：模型存档文件的GCS路径

将自定义预测处理程序代码从本地路径复制到 GCS 位置

**注意**：自定义预测处理程序在 [上一个笔记本](./pytorch-text-classification-vertex-ai-train-tune-deploy.ipynb) 中定义

In [None]:
# copy custom prediction handler
!gsutil cp ./predictor/custom_handler.py ./predictor/index_to_name.json {BUCKET_NAME}/{APP_NAME}/serve/predictor/

# list copied files from GCS location
!gsutil ls -lR {BUCKET_NAME}/{APP_NAME}/serve/

print(f"Copied custom prediction handler code to {BUCKET_NAME}/{APP_NAME}/serve/")

定义自定义管道组件以创建模型存档文件

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=["torch-model-archiver"],
    output_component_file="./pipelines/generate_mar_file.yaml",
)
def generate_mar_file(
    model_display_name: str,
    model_version: str,
    handler: str,
    model: Input[Model],
    model_mar: Output[Model],
) -> NamedTuple("Outputs", [("mar_env_var", list), ("mar_export_uri", str)]):
    """custom pipeline component to package model artifacts and custom
    handler to a model archive file using Torch Model Archiver tool
    """
    import logging
    import os
    import subprocess
    import time
    from collections import namedtuple
    from pathlib import Path

    logging.getLogger().setLevel(logging.INFO)

    # create directory to save model archive file
    model_output_root = model.path
    mar_output_root = model_mar.path
    export_path = f"{mar_output_root}/model-store"
    try:
        Path(export_path).mkdir(parents=True, exist_ok=True)
    except Exception as e:
        logging.warning(e)
        # retry after pause
        time.sleep(2)
        Path(export_path).mkdir(parents=True, exist_ok=True)

    # parse and configure paths for model archive config
    handler_path = (
        handler.replace("gs://", "/gcs/") + "predictor/custom_handler.py"
        if handler.startswith("gs://")
        else handler
    )
    model_artifacts_dir = f"{model_output_root}/model/{model_display_name}"
    extra_files = [
        os.path.join(model_artifacts_dir, f)
        for f in os.listdir(model_artifacts_dir)
        if f != "pytorch_model.bin"
    ]

    # define model archive config
    mar_config = {
        "MODEL_NAME": model_display_name,
        "HANDLER": handler_path,
        "SERIALIZED_FILE": f"{model_artifacts_dir}/pytorch_model.bin",
        "VERSION": model_version,
        "EXTRA_FILES": ",".join(extra_files),
        "EXPORT_PATH": f"{model_mar.path}/model-store",
    }

    # generate model archive command
    archiver_cmd = (
        "torch-model-archiver --force "
        f"--model-name {mar_config['MODEL_NAME']} "
        f"--serialized-file {mar_config['SERIALIZED_FILE']} "
        f"--handler {mar_config['HANDLER']} "
        f"--version {mar_config['VERSION']}"
    )
    if "EXPORT_PATH" in mar_config:
        archiver_cmd += f" --export-path {mar_config['EXPORT_PATH']}"
    if "EXTRA_FILES" in mar_config:
        archiver_cmd += f" --extra-files {mar_config['EXTRA_FILES']}"
    if "REQUIREMENTS_FILE" in mar_config:
        archiver_cmd += f" --requirements-file {mar_config['REQUIREMENTS_FILE']}"

    # run archiver command
    logging.warning("Running archiver command: %s", archiver_cmd)
    with subprocess.Popen(
        archiver_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
    ) as p:
        _, err = p.communicate()
        if err:
            raise ValueError(err)

    # set output variables
    mar_env_var = [{"name": "MODEL_NAME", "value": model_display_name}]
    mar_export_uri = f"{model_mar.uri}/model-store/"

    outputs = namedtuple("Outputs", ["mar_env_var", "mar_export_uri"])
    return outputs(mar_env_var, mar_export_uri)

### 4. 组件：创建自定义运行TorchServe的服务容器

这一步构建一个[自定义服务容器](https://cloud.google.com/vertex-ai/docs/predictions/custom-container-requirements)，运行[TorchServe](https://pytorch.org/serve/) HTTP服务器，用于为挂载的模型提供预测请求。此步骤的输出是自定义服务容器的容器注册表URI。

- **输入**：
    - **`project`：** 要运行的项目ID
    - **`serving_image_uri`：** 容器注册表中自定义服务容器的URI
    - **`gs_serving_dependencies_path`：** 服务依赖项的位置 - Dockerfile
- **输出**： 
    - **`serving_image_uri`：** 容器注册表中自定义服务容器的URI

将TorchServe CPU镜像作为基础创建`Dockerfile`，安装所需的依赖项并运行TorchServe serve命令。

In [None]:
%%bash -s $APP_NAME

APP_NAME=$1

cat << EOF > ./predictor/Dockerfile.serve

FROM pytorch/torchserve:latest-cpu

USER root
# run and update some basic packages software packages, including security libs
RUN apt-get update && \
    apt-get install -y software-properties-common && \
    add-apt-repository -y ppa:ubuntu-toolchain-r/test && \
    apt-get update && \
    apt-get install -y gcc-9 g++-9 apt-transport-https ca-certificates gnupg curl

# Install gcloud tools for gsutil as well as debugging
RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | \
    tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && \
    curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | \
    apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && \
    apt-get update -y && \
    apt-get install google-cloud-sdk -y

USER model-server

# install dependencies
RUN python3 -m pip install --upgrade pip
RUN pip3 install transformers

ARG MODEL_NAME=$APP_NAME
ENV MODEL_NAME="\${MODEL_NAME}"

# health and prediction listener ports
ARG AIP_HTTP_PORT=7080
ENV AIP_HTTP_PORT="\${AIP_HTTP_PORT}"

ARG MODEL_MGMT_PORT=7081

# expose health and prediction listener ports from the image
EXPOSE "\${AIP_HTTP_PORT}"
EXPOSE "\${MODEL_MGMT_PORT}"
EXPOSE 8080 8081 8082 7070 7071

# create torchserve configuration file
USER root
RUN echo "service_envelope=json\n" \
    "inference_address=http://0.0.0.0:\${AIP_HTTP_PORT}\n" \
    "management_address=http://0.0.0.0:\${MODEL_MGMT_PORT}" >> \
    /home/model-server/config.properties
USER model-server

# run Torchserve HTTP serve to respond to prediction requests
CMD ["echo", "AIP_STORAGE_URI=\${AIP_STORAGE_URI}", ";", \
    "gsutil", "cp", "-r", "\${AIP_STORAGE_URI}/\${MODEL_NAME}.mar", "/home/model-server/model-store/", ";", \
    "ls", "-ltr", "/home/model-server/model-store/", ";", \
    "torchserve", "--start", "--ts-config=/home/model-server/config.properties", \
    "--models", "\${MODEL_NAME}=\${MODEL_NAME}.mar", \
    "--model-store", "/home/model-server/model-store"]
EOF

echo "Writing ./predictor/Dockerfile"

将`Dockerfile`从本地路径复制到GCS位置。

In [None]:
# copy serving Dockerfile
!gsutil cp ./predictor/Dockerfile.serve {BUCKET_NAME}/{APP_NAME}/serve/

# list copied files from GCS location
!gsutil ls -lR {BUCKET_NAME}/{APP_NAME}/serve/

print(f"Copied serving Dockerfile to {BUCKET_NAME}/{APP_NAME}/serve/")

定义自定义管道组件以构建自定义服务容器

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-build"],
    output_component_file="./pipelines/build_custom_serving_image.yaml",
)
def build_custom_serving_image(
    project: str, gs_serving_dependencies_path: str, serving_image_uri: str
) -> NamedTuple("Outputs", [("serving_image_uri", str)],):
    """custom pipeline component to build custom serving image using
    Cloud Build and dependencies defined in the Dockerfile
    """
    import logging
    import os

    from google.cloud.devtools import cloudbuild_v1 as cloudbuild
    from google.protobuf.duration_pb2 import Duration

    logging.getLogger().setLevel(logging.INFO)
    build_client = cloudbuild.services.cloud_build.CloudBuildClient()

    logging.info(f"gs_serving_dependencies_path: {gs_serving_dependencies_path}")
    gs_dockerfile_path = os.path.join(gs_serving_dependencies_path, "Dockerfile.serve")

    logging.info(f"serving_image_uri: {serving_image_uri}")
    build = cloudbuild.Build()
    build.steps = [
        {
            "name": "gcr.io/cloud-builders/gsutil",
            "args": ["cp", gs_dockerfile_path, "Dockerfile"],
        },
        # enabling Kaniko cache in a Docker build that caches intermediate
        # layers and pushes image automatically to Container Registry
        # https://cloud.google.com/build/docs/kaniko-cache
        {
            "name": "gcr.io/kaniko-project/executor:latest",
            "args": [f"--destination={serving_image_uri}", "--cache=true"],
        },
    ]
    # override default timeout of 10min
    timeout = Duration()
    timeout.seconds = 7200
    build.timeout = timeout

    # create build
    operation = build_client.create_build(project_id=project, build=build)
    logging.info("IN PROGRESS:")
    logging.info(operation.metadata)

    # get build status
    result = operation.result()
    logging.info("RESULT:", result.status)

    # return step outputs
    return (serving_image_uri,)

### 5. 组件：测试模型部署并进行在线预测请求

此步骤将向 Vertex AI 端点发送测试请求，并通过发送测试预测请求来验证部署。当从模型服务器返回文本情感时，部署被认为是成功的。

- **输入**：
    - **`project`：** 要运行的项目 ID
    - **`bucket`：** 暂存 GCS 存储桶路径
    - **`endpoint`：** Vertex AI 端点的位置，来自端点创建任务
    - **`instances`：** 测试预测请求的列表
- **输出**：
    - None

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-aiplatform", "google-cloud-pipeline-components"],
    output_component_file="./pipelines/make_prediction_request.yaml",
)
def make_prediction_request(project: str, bucket: str, endpoint: str, instances: list):
    """custom pipeline component to pass prediction requests to Vertex AI
    endpoint and get responses
    """
    import base64
    import logging

    from google.cloud import aiplatform
    from google.protobuf.json_format import Parse
    from google_cloud_pipeline_components.proto.gcp_resources_pb2 import \
        GcpResources

    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project, staging_bucket=bucket)

    # parse endpoint resource
    logging.info(f"Endpoint = {endpoint}")
    gcp_resources = Parse(endpoint, GcpResources())
    endpoint_uri = gcp_resources.resources[0].resource_uri
    endpoint_id = "/".join(endpoint_uri.split("/")[-8:-2])
    logging.info(f"Endpoint ID = {endpoint_id}")

    # define endpoint client
    _endpoint = aiplatform.Endpoint(endpoint_id)

    # call prediction endpoint for each instance
    for instance in instances:
        if not isinstance(instance, (bytes, bytearray)):
            instance = instance.encode()
        logging.info(f"Input text: {instance.decode('utf-8')}")
        b64_encoded = base64.b64encode(instance)
        test_instance = [{"data": {"b64": f"{str(b64_encoded.decode('utf-8'))}"}}]
        response = _endpoint.predict(instances=test_instance)
        logging.info(f"Prediction response: {response.predictions}")

## 定义管道规范

管道定义描述了输入和输出参数以及构件在步骤之间如何传递。

设置环境变量

这些环境变量将用于定义资源规格，如训练作业、模型资源等。

In [None]:
os.environ["PROJECT_ID"] = PROJECT_ID
os.environ["BUCKET"] = BUCKET_NAME
os.environ["REGION"] = REGION
os.environ["APP_NAME"] = APP_NAME

创建管道配置文件

管道配置文件有助于将管道模板化，从而使其可以使用不同的参数运行相同的管道。

In [None]:
%%writefile ./pipelines/pipeline_config.py

import os
from datetime import datetime

PROJECT_ID = os.getenv("PROJECT_ID", "")
BUCKET = os.getenv("BUCKET", "")
REGION = os.getenv("REGION", "us-central1")

APP_NAME = os.getenv("APP_NAME", "finetuned-bert-classifier")
VERSION = datetime.now().strftime("%Y%m%d%H%M%S")
MODEL_NAME = APP_NAME
MODEL_DISPLAY_NAME = f"{MODEL_NAME}-{VERSION}"

PIPELINE_NAME = f"pytorch-{APP_NAME}"
PIPELINE_ROOT = f"{BUCKET}/pipeline_root/{MODEL_NAME}"
GCS_STAGING = f"{BUCKET}/pipeline_root/{MODEL_NAME}"

TRAIN_IMAGE_URI = f"gcr.io/{PROJECT_ID}/pytorch_gpu_train_{MODEL_NAME}"
SERVE_IMAGE_URI = f"gcr.io/{PROJECT_ID}/pytorch_cpu_predict_{MODEL_NAME}"

MACHINE_TYPE = "n1-standard-8"
REPLICA_COUNT = "1"
ACCELERATOR_TYPE = "NVIDIA_TESLA_T4"
ACCELERATOR_COUNT = "1"
NUM_WORKERS = 1

SERVING_HEALTH_ROUTE = "/ping"
SERVING_PREDICT_ROUTE = f"/predictions/{MODEL_NAME}"
SERVING_CONTAINER_PORT= [{"containerPort": 7080}]
SERVING_MACHINE_TYPE = "n1-standard-4"
SERVING_MIN_REPLICA_COUNT = 1
SERVING_MAX_REPLICA_COUNT=1
SERVING_TRAFFIC_SPLIT='{"0": 100}'

定义管道规范

管道被定义为一个独立的Python函数，使用[`@kfp.dsl.pipeline`](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/v2/components/pipeline_context.py)修饰符标记，指定了管道的名称和存储管道工件的根路径。

管道定义包括预构建的组件和自定义定义的组件：
- 来自[Google Cloud Pipeline Components SDK](https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction)的预构建组件用于调用Vertex AI服务的任务，如提交自定义训练作业（`custom_job.CustomTrainingJobOp`）、上传模型（`ModelUploadOp`）、创建端点（`EndpointCreateOp`）以及将模型部署到端点（`ModelDeployOp`）
- 自定义组件用于构建用于训练的自定义容器任务（`build_custom_train_image`）、获取训练作业详细信息（`get_training_job_details`）、创建mar文件（`generate_mar_file`）和服务（`build_custom_serving_image`）以及验证模型部署任务（`ake_prediction_request`）。有关这些任务的自定义组件规范，请参考笔记本。

In [None]:
from pipelines import pipeline_config as cfg


@dsl.pipeline(
    name=cfg.PIPELINE_NAME,
    pipeline_root=cfg.PIPELINE_ROOT,
)
def pytorch_text_classifier_pipeline(
    pipeline_job_id: str,
    gs_train_script_path: str,
    gs_serving_dependencies_path: str,
    eval_acc_threshold: float,
    is_hp_tuning_enabled: str = "n",
):
    # ========================================================================
    # build custom training container image
    # ========================================================================
    # build custom container for training job passing the
    # GCS location of the training application code
    build_custom_train_image_task = (
        build_custom_train_image(
            project=cfg.PROJECT_ID,
            gs_train_src_path=gs_train_script_path,
            training_image_uri=cfg.TRAIN_IMAGE_URI,
        )
        .set_caching_options(True)
        .set_display_name("Build custom training image")
    )

    # ========================================================================
    # model training
    # ========================================================================
    # train the model on Vertex AI by submitting a CustomJob
    # using the custom container (no hyper-parameter tuning)
    # define training code arguments
    training_args = ["--num-epochs", "2", "--model-name", cfg.MODEL_NAME]
    # define job name
    JOB_NAME = f"{cfg.MODEL_NAME}-train-pytorch-cstm-cntr-{TIMESTAMP}"
    GCS_BASE_OUTPUT_DIR = f"{cfg.GCS_STAGING}/{TIMESTAMP}"
    # define worker pool specs
    worker_pool_specs = [
        {
            "machine_spec": {
                "machine_type": cfg.MACHINE_TYPE,
                "accelerator_type": cfg.ACCELERATOR_TYPE,
                "accelerator_count": cfg.ACCELERATOR_COUNT,
            },
            "replica_count": cfg.REPLICA_COUNT,
            "container_spec": {"image_uri": cfg.TRAIN_IMAGE_URI, "args": training_args},
        }
    ]

    run_train_task = (
        custom_job.CustomTrainingJobOp(
            project=cfg.PROJECT_ID,
            location=cfg.REGION,
            display_name=JOB_NAME,
            base_output_directory=GCS_BASE_OUTPUT_DIR,
            worker_pool_specs=worker_pool_specs,
        )
        .set_display_name("Run custom training job")
        .after(build_custom_train_image_task)
    )

    # ========================================================================
    # get training job details
    # ========================================================================
    training_job_details_task = get_training_job_details(
        project=cfg.PROJECT_ID,
        location=cfg.REGION,
        job_resource=run_train_task.output,
        eval_metric_key="eval_accuracy",
        model_display_name=cfg.MODEL_NAME,
    ).set_display_name("Get custom training job details")

    # ========================================================================
    # model deployment when condition is met
    # ========================================================================
    with dsl.Condition(
        training_job_details_task.outputs["eval_metric"] > eval_acc_threshold,
        name="model-deploy-decision",
    ):
        # ===================================================================
        # create model archive file
        # ===================================================================
        create_mar_task = generate_mar_file(
            model_display_name=cfg.MODEL_NAME,
            model_version=cfg.VERSION,
            handler=gs_serving_dependencies_path,
            model=training_job_details_task.outputs["model"],
        ).set_display_name("Create MAR file")

        # ===================================================================
        # build custom serving container running TorchServe
        # ===================================================================
        # build custom container for serving predictions using
        # the trained model artifacts served by TorchServe
        build_custom_serving_image_task = build_custom_serving_image(
            project=cfg.PROJECT_ID,
            gs_serving_dependencies_path=gs_serving_dependencies_path,
            serving_image_uri=cfg.SERVE_IMAGE_URI,
        ).set_display_name("Build custom serving image")

        # ===================================================================
        # create model resource
        # ===================================================================
        # upload model to vertex ai
        model_upload_task = (
            aip_components.ModelUploadOp(
                project=cfg.PROJECT_ID,
                display_name=cfg.MODEL_DISPLAY_NAME,
                serving_container_image_uri=cfg.SERVE_IMAGE_URI,
                serving_container_predict_route=cfg.SERVING_PREDICT_ROUTE,
                serving_container_health_route=cfg.SERVING_HEALTH_ROUTE,
                serving_container_ports=cfg.SERVING_CONTAINER_PORT,
                serving_container_environment_variables=create_mar_task.outputs[
                    "mar_env_var"
                ],
                artifact_uri=create_mar_task.outputs["mar_export_uri"],
            )
            .set_display_name("Upload model")
            .after(build_custom_serving_image_task)
        )

        # ===================================================================
        # create Vertex AI Endpoint
        # ===================================================================
        # create endpoint to deploy one or more models
        # An endpoint provides a service URL where the prediction requests are sent
        endpoint_create_task = (
            aip_components.EndpointCreateOp(
                project=cfg.PROJECT_ID,
                display_name=cfg.MODEL_NAME + "-endpoint",
            )
            .set_display_name("Create endpoint")
            .after(create_mar_task)
        )

        # ===================================================================
        # deploy model to Vertex AI Endpoint
        # ===================================================================
        # deploy models to endpoint to associates physical resources with the model
        # so it can serve online predictions
        model_deploy_task = aip_components.ModelDeployOp(
            endpoint=endpoint_create_task.outputs["endpoint"],
            model=model_upload_task.outputs["model"],
            deployed_model_display_name=cfg.MODEL_NAME,
            dedicated_resources_machine_type=cfg.SERVING_MACHINE_TYPE,
            dedicated_resources_min_replica_count=cfg.SERVING_MIN_REPLICA_COUNT,
            dedicated_resources_max_replica_count=cfg.SERVING_MAX_REPLICA_COUNT,
            traffic_split=cfg.SERVING_TRAFFIC_SPLIT,
        ).set_display_name("Deploy model to endpoint")

        # ===================================================================
        # test model deployment
        # ===================================================================
        # test model deployment by making online prediction requests
        test_instances = [
            "Jaw dropping visual affects and action! One of the best I have seen to date.",
            "Take away the CGI and the A-list cast and you end up with film with less punch.",
        ]
        predict_test_instances_task = make_prediction_request(
            project=cfg.PROJECT_ID,
            bucket=cfg.BUCKET,
            endpoint=model_deploy_task.outputs["gcp_resources"],
            instances=test_instances,
        ).set_display_name("Test model deployment making online predictions")
        predict_test_instances_task

让我们来解开这段代码，了解一些事情：

- 组件的输入可以通过流水线的输入（作为参数传递）设置，也可以依赖于此流水线中其他组件的输出。例如，`ModelUploadOp` 依赖于从 `build_custom_serving_image` 任务中获取的自定义服务容器图像 URI，以及流水线的输入，如项目 ID。
- `kfp.dsl.Condition` 是一个控制结构，包含一组任务，仅当条件满足时才运行。在这个流水线中，只有当训练模型性能超过设定阈值时，模型部署步骤才会运行。否则，这些步骤将被跳过。
- 流水线中的每个组件都在自己的容器图像中运行。您可以为每个流水线步骤指定机器类型，如 CPU、GPU 和内存限制。默认情况下，每个组件都作为 Vertex AI CustomJob 在一个 e2-standard-4 机器上运行。
- 默认情况下，启用了流水线执行缓存。Vertex AI Pipelines 服务会检查每个流水线步骤的执行是否存在于 Vertex ML metadata 中。它使用流水线名称、步骤的输入、输出和组件规范的组合。当匹配的执行已经存在时，该步骤将被跳过，从而降低成本。执行缓存可以在任务级别或流水线级别关闭。

以下是生成的此管道的运行时图

![pytorch-pipeline-runtime-graph](./images/pytorch-pipeline-runtime-graph.png)

要了解更多关于构建管道的信息，请参考[构建 Kubeflow 管道](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline#build-pipeline)部分，并关注[管道样本和教程](https://cloud.google.com/vertex-ai/docs/pipelines/notebooks#general-tutorials)。

提交管线

将管道规范编译为 JSON

在定义了管道之后，必须将其编译以便在 Vertex 人工智能管道服务上执行。当管道被编译时，KFP SDK 分析组件之间的数据依赖关系，创建一个有向无环图。编译后的管道以 JSON 格式存储，包含运行管道所需的所有信息。

In [None]:
PIPELINE_JSON_SPEC_PATH = "./pipelines/pytorch_text_classifier_pipeline_spec.json"
compiler.Compiler().compile(
    pipeline_func=pytorch_text_classifier_pipeline, package_path=PIPELINE_JSON_SPEC_PATH
)

#### 在Vertex AI管道上提交管道以执行

通过使用Python客户端的Vertex AI SDK定义PipelineJob，并传递必要的管道输入，将管道提交到Vertex AI管道。

In [None]:
# initialize Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=REGION)

In [None]:
# define pipeline parameters
# NOTE: These parameters can be included in the pipeline config file as needed

PIPELINE_JOB_ID = f"pipeline-{APP_NAME}-{get_timestamp()}"
TRAIN_APP_CODE_PATH = f"{BUCKET_NAME}/{APP_NAME}/train/"
SERVE_DEPENDENCIES_PATH = f"{BUCKET_NAME}/{APP_NAME}/serve/"

pipeline_params = {
    "pipeline_job_id": PIPELINE_JOB_ID,
    "gs_train_script_path": TRAIN_APP_CODE_PATH,
    "gs_serving_dependencies_path": SERVE_DEPENDENCIES_PATH,
    "eval_acc_threshold": 0.87,
    "is_hp_tuning_enabled": "n",
}

In [None]:
# define pipeline job
pipeline_job = pipeline_jobs.PipelineJob(
    display_name=cfg.PIPELINE_NAME,
    job_id=PIPELINE_JOB_ID,
    template_path=PIPELINE_JSON_SPEC_PATH,
    pipeline_root=PIPELINE_ROOT,
    parameter_values=pipeline_params,
    enable_caching=True,
)

当管道提交时，日志会显示一个链接，可以在Google Cloud 控制台上查看管道运行，或者通过打开[顶点 AI 的管道仪表板](https://console.cloud.google.com/vertex-ai/pipelines)来访问运行。

In [None]:
# submit pipeline job for execution
response = pipeline_job.run(sync=True)
response

## 监控管道

您可以通过导航到[Vertex AI Pipelines 仪表板](https://console.cloud.google.com/vertex-ai/pipelines)来监视管道执行的进度。

```
INFO:google.cloud.aiplatform.pipeline_jobs:创建 PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob 已创建。资源名称：projects/<project-id>/locations/<region>/pipelineJobs/pipeline-finetuned-bert-classifier-20220119061941
INFO:google.cloud.aiplatform.pipeline_jobs:要在另一个会话中使用此 PipelineJob：
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/<project-id>/locations/<region>/pipelineJobs/pipeline-finetuned-bert-classifier-20220119061941')
INFO:google.cloud.aiplatform.pipeline_jobs:查看管道作业：
https://console.cloud.google.com/vertex-ai/locations/region/pipelines/runs/pipeline-finetuned-bert-classifier-20220119061941?project=<project-id>
```

#### 组件执行日志

由于管道中的每个步骤都在自己的容器中运行或作为远程作业（例如 Dataflow、Dataproc 作业），您可以通过单击步骤上的“查看日志”按钮来查看步骤日志。

![pipeline-step-logs](./images/pipeline-step-logs.png)

#### 文物和谱系

在管道图中，您可以注意到每个步骤后面的小方框。这些是从步骤生成的文物。例如，“创建 MAR 文件”步骤会生成 MAR 文件作为文物。单击文物以了解更多详情。

![pipeline-artifact-and-lineage](./images/pipeline-artifact-and-lineage.png)

您可以跟踪文物的谱系，描述其与管道中步骤的关系。Vertex AI Pipelines 自动跟踪元数据和谱系。这个谱系有助于建立模型治理和可复制性。单击文物上的“查看谱系”按钮，它会显示以下谱系图。

![artifact-lineage](./images/artifact-lineage.png)

#### 使用Vertex AI SDK比较Pipeline运行情况

在运行不同实验的Pipeline执行时，您可能希望比较不同Pipeline运行中的指标。您可以从Vertex AI Pipelines仪表板中[比较Pipeline运行](https://cloud.google.com/vertex-ai/docs/pipelines/visualize-pipeline#compare_pipeline_runs_using)。

另外，您也可以使用Vertex AI SDK for Python中的`aiplatform.get_pipeline_df()`方法，该方法获取Pipeline的执行元数据，并返回一个Pandas dataframe。

In [None]:
# underscores are not supported in the pipeline name, so
# replace underscores with hyphen
df_pipeline = aiplatform.get_pipeline_df(pipeline=cfg.PIPELINE_NAME.replace("_", "-"))
df_pipeline

清理

### 清理培训和部署资源

要清理此笔记本中使用的所有Google Cloud资源，您可以删除用于本教程的[Google Cloud项目](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects)。

否则，您可以删除本教程中创建的各个资源：

- 培训作业
- 模型
- 端点
- 云存储桶
- 容器映像
- 流水线运行

将要删除的资源类型设置标志。

In [None]:
delete_custom_job = False
delete_hp_tuning_job = False
delete_endpoint = False
delete_model = False
delete_bucket = False
delete_image = False
delete_pipeline_job = False

定义工作，模型和端点的客户端

In [None]:
# API Endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

# Vertex AI location root path for your dataset, model and endpoint resources
PARENT = f"projects/{PROJECT_ID}/locations/{REGION}"

client_options = {"api_endpoint": API_ENDPOINT}

# Initialize Vertex SDK
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

In [None]:
# functions to create client
def create_job_client():
    client = aip.JobServiceClient(client_options=client_options)
    return client


def create_model_client():
    client = aip.ModelServiceClient(client_options=client_options)
    return client


def create_endpoint_client():
    client = aip.EndpointServiceClient(client_options=client_options)
    return client


def create_pipeline_client():
    client = aip.PipelineServiceClient(client_options=client_options)
    return client


clients = {}
clients["job"] = create_job_client()
clients["model"] = create_model_client()
clients["endpoint"] = create_endpoint_client()
clients["pipeline"] = create_pipeline_client()

在笔记本中之前定义的APP_NAME开头，定义函数列出作业、模型和端点。

In [None]:
def list_custom_jobs():
    client = clients["job"]
    jobs = []
    response = client.list_custom_jobs(parent=PARENT)
    for row in response:
        _row = MessageToDict(row._pb)
        if _row["displayName"].startswith(APP_NAME):
            jobs.append((_row["name"], _row["displayName"]))
    return jobs


def list_hp_tuning_jobs():
    client = clients["job"]
    jobs = []
    response = client.list_hyperparameter_tuning_jobs(parent=PARENT)
    for row in response:
        _row = MessageToDict(row._pb)
        if _row["displayName"].startswith(APP_NAME):
            jobs.append((_row["name"], _row["displayName"]))
    return jobs


def list_models():
    client = clients["model"]
    models = []
    response = client.list_models(parent=PARENT)
    for row in response:
        _row = MessageToDict(row._pb)
        if _row["displayName"].startswith(APP_NAME):
            models.append((_row["name"], _row["displayName"]))
    return models


def list_endpoints():
    client = clients["endpoint"]
    endpoints = []
    response = client.list_endpoints(parent=PARENT)
    for row in response:
        _row = MessageToDict(row._pb)
        if _row["displayName"].startswith(APP_NAME):
            endpoints.append((_row["name"], _row["displayName"]))
    return endpoints


def list_pipelines():
    client = clients["pipeline"]
    pipelines = []
    request = aip.ListPipelineJobsRequest(
        parent=PARENT, filter=f'display_name="{cfg.PIPELINE_NAME}"', order_by="end_time"
    )
    response = client.list_pipeline_jobs(request=request)

    for row in response:
        _row = MessageToDict(row._pb)
        pipelines.append(_row["name"])
    return pipelines

删除自定义训练作业

In [None]:
# Delete the custom training using the Vertex AI fully qualified identifier for the custom training
try:
    if delete_custom_job:
        custom_jobs = list_custom_jobs()
        for job_id, job_name in custom_jobs:
            print(f"Deleting job {job_id} [{job_name}]")
            clients["job"].delete_custom_job(name=job_id)
except Exception as e:
    print(e)

删除超参数调整作业

In [None]:
# Delete the hyperparameter tuning jobs using the Vertex AI fully qualified identifier for the hyperparameter tuning job
try:
    if delete_hp_tuning_job:
        hp_tuning_jobs = list_hp_tuning_jobs()
        for job_id, job_name in hp_tuning_jobs:
            print(f"Deleting job {job_id} [{job_name}]")
            clients["job"].delete_hyperparameter_tuning_job(name=job_id)
except Exception as e:
    print(e)

### 卸载模型并删除端点

In [None]:
# Delete the endpoint using the Vertex AI fully qualified identifier for the endpoint
try:
    if delete_endpoint:
        endpoints = list_endpoints()
        for endpoint_id, endpoint_name in endpoints:
            endpoint = aiplatform.Endpoint(endpoint_id)
            # undeploy models from the endpoint
            print(f"Undeploying all deployed models from the endpoint {endpoint_name}")
            endpoint.undeploy_all(sync=True)
            # deleting endpoint
            print(f"Deleting endpoint {endpoint_id} [{endpoint_name}]")
            clients["endpoint"].delete_endpoint(name=endpoint_id)
except Exception as e:
    print(e)

### 删除模型

In [None]:
# Delete the model using the Vertex AI fully qualified identifier for the model
try:
    if delete_model:
        models = list_models()
        for model_id, model_name in models:
            print(f"Deleting model {model_id} [{model_name}]")
            clients["model"].delete_model(name=model_id)
except Exception as e:
    print(e)

### 删除管道运行

In [None]:
# Delete the pipeline execution using the Vertex AI fully qualified identifier for the pipeline job
try:
    if delete_pipeline_job:
        pipelines = list_pipelines()
        for pipeline_name in pipelines[:1]:
            print(f"Deleting pipeline run {pipeline_name}")
            if delete_custom_job:
                print("\t Deleting underlying custom jobs")
                pipeline_job = clients["pipeline"].get_pipeline_job(name=pipeline_name)
                pipeline_job = MessageToDict(pipeline_job._pb)
                task_details = pipeline_job["jobDetail"]["taskDetails"]
                for task in tasks:
                    if "containerDetail" in task["executorDetail"]:
                        custom_job_id = task["executorDetail"]["containerDetail"][
                            "mainJob"
                        ]
                        print(
                            f"\t Deleting custom job {custom_job_id} for task {task['taskName']}"
                        )
                        clients["job"].delete_custom_job(name=custom_job_id)
            clients["pipeline"].delete_pipeline_job(name=pipeline_name)
except Exception as e:
    print(e)

### 从暂存桶中删除内容

---

***注意：此云存储桶中的所有内容都将被删除。请慎重运行。***

---

In [None]:
if delete_bucket and "BUCKET_NAME" in globals():
    print(f"Deleting all contents from the bucket {BUCKET_NAME}")

    shell_output = ! gsutil du -as $BUCKET_NAME
    print(
        f"Size of the bucket {BUCKET_NAME} before deleting = {shell_output[0].split()[0]} bytes"
    )

    # uncomment below line to delete contents of the bucket
    # ! gsutil rm -r $BUCKET_NAME

    shell_output = ! gsutil du -as $BUCKET_NAME
    if float(shell_output[0].split()[0]) > 0:
        print(
            "PLEASE UNCOMMENT LINE TO DELETE BUCKET. CONTENT FROM THE BUCKET NOT DELETED"
        )

    print(
        f"Size of the bucket {BUCKET_NAME} after deleting = {shell_output[0].split()[0]} bytes"
    )

### 从容器注册表中删除图像

从注册表中删除在本教程中使用变量APP_NAME定义的前缀为APP_NAME的所有容器图像。所有相关的标签也会被删除。

In [None]:
gcr_images = !gcloud container images list --repository=gcr.io/$PROJECT_ID --filter="name~"$APP_NAME

if delete_image:
    for image in gcr_images:
        if image != "NAME":  # skip header line
            print(f"Deleting image {image} including all tags")
            !gcloud container images delete $image --force-delete-tags --quiet

### 清理笔记本环境

在完成实验后，您可以选择停止或删除AI笔记本实例以避免任何费用。如果要保存您的工作，您可以选择停止实例。

```
# 停止笔记本实例
gcloud笔记本实例停止示例实例 --位置=us-central1-a


# 删除笔记本实例
gcloud笔记本实例删除示例实例 --位置=us-central1-a
```