# 利用Vertex Pipeline实现图片分类模型的自动化训练与部署（自定义模型）

## 步骤一：环境准备

### 1.1 安装依赖包

In [None]:
import os

# Google Cloud Notebook
if os.path.exists("/opt/deeplearning/metadata/env_version") or os.getenv("IS_TESTING"):
    USER_FLAG = "--user"
else:
    USER_FLAG = ""

! pip3 install --upgrade google-cloud-aiplatform $USER_FLAG

In [None]:
! pip3 install kfp google-cloud-pipeline-components --upgrade $USER_FLAG

In [None]:
if os.getenv("IS_TESTING"):
    ! pip3 install --upgrade --force-reinstall $USER_FLAG tensorflow==2.5 kfp google-cloud-aiplatform google-cloud-storage google-cloud-pipel

In [12]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

### 1.2 配置环境变量

In [12]:
PROJECT_ID = "kubeflow-demo-339102"
REGION = "us-central1"
BUCKET_NAME = "gs://kubeflow-pipeline-demo-charles" 

In [None]:
!gcloud config set project {PROJECT_ID}

### 1.3 创建资源

创建GCS存储桶，该存储桶的用途包括：  
- 存放自定义模型训练的代码，Vertex training job在启动后会从该存储桶内下载模型训练代码，然后开启模型训练任务。  
- 存放Vertex Pipeline每个步骤产生的Output数据（Vertex的Pipeline通常由多个步骤组成，在很多场景下，某一步骤的任务会使用上一步骤或上几个步骤产生的output数据）。  
- 存放训练后的模型，Vertex training job在结束模型的训练后，会将模型存放在此存储桶上，最后Vertex Predict Endpoint在部署模型时会从此存储桶下载模型。

In [None]:
! gsutil mb -l $REGION $BUCKET_NAME

### 1.4 权限配置

Vertex AI 会为workbench实例自动分配并创建service account，通过为该service account分配存储桶的操作权限，用户可以在workbench实例上对存储桶进行数据上传、下载等操作。

In [11]:
import os
import sys

# If on Google Cloud Notebook, then don't execute this code
if not os.path.exists("/opt/deeplearning/metadata/env_version"):
    if "google.colab" in sys.modules:
        from google.colab import auth as google_auth

        google_auth.authenticate_user()

    # If you are running this notebook locally, replace the string below with the
    # path to your service account key and run this cell to authenticate your GCP
    # account.
    elif not os.getenv("IS_TESTING"):
        %env GOOGLE_APPLICATION_CREDENTIALS ''

In [None]:
SERVICE_ACCOUNT = ""  # @param {type:"string"}
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your GCP project id from gcloud
    shell_output = !gcloud auth list 2>/dev/null
    SERVICE_ACCOUNT = shell_output[2].strip()
    print("Service Account:", SERVICE_ACCOUNT)

In [None]:
将上一步输出的service account粘贴在下方。

In [14]:
SERVICE_ACCOUNT = "71837221026-compute@developer.gserviceaccount.com"

In [15]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_NAME

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_NAME

### 1.5 上传模型训练代码包

In [28]:
TRAINING_CODE_PACKAGE_DIR = BUCKET_NAME + "/training/"

In [None]:
! gsutil cp custom-training.tar.gz $TRAINING_CODE_PACKAGE_DIR

## 步骤二：编写Vertex AI Pipeline

Vertex AI Pipeline 提供了功能完备的sdk供开发者调用，开发者可以利用SDK快速构建机器学习工作流，并将工作流按照特定的顺序和逻辑编排起来。此外Vertex AI Pipeline与Vertex AI其它的功能以及GCP的其它服务进行了深度集成，从而使开发者可以快速轻松地创建工作流的任务，如创建Vertex AI训练任务、创建Vertex AI模型部署、向Big Query传输数据等。Vertex AI Pipeline与开源的Kubeflow pipeline接口兼容，并在开源的基础上做了更多的扩展，对于熟悉Kubeflow Pipeline的开发人员，可以快速地掌握Vertex AI Pipeline。

在本实验中，我们将利用Vertex AI Pipeline的sdk定义一个端到端的机器学习工作流，改了流程由四个步骤组成：  
- 创建Vertex AI训练任务，该任务会从GCS存储桶上下载模型训练代码并开启模型的训练，在模型训练结束后，任务会将训练好的模型自动上传至GCS存储桶上。
- 创建Vertex AI模型，模型存储源选自第一步存储在GCS上的模型文件。
- 创建Vertex AI Endpoint。
- 部署Vertex AI Model 至 Vertex AI Endpoint。


In [32]:
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.experimental.custom_job import utils
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component

定义环境变量，这些变量会作为Vertex AI Pipline的参数自动传入到Vertex AI Pipeline具体的任务中。

In [45]:
MODEL_DIR = BUCKET_NAME + "/output/model"
TRAINING_PACKAGE_FILE = BUCKET_NAME + "/training/custom-training.tar.gz"
MODEL_OUTPUT_DIR = BUCKET_NAME + "/output"
MODEL_NAME = "image-classification-custom-model"

初始化Vertex AI Pipeline。

In [34]:
aip.init(project=PROJECT_ID, staging_bucket=BUCKET_NAME)

定义Vertex AI Pipeline，Pipeline中对应的具体任务都会定义在下方的代码中。

In [40]:
@component
def print_op(input1: str):
    print("training task: {}".format(input1))
    

@dsl.pipeline(name="custom-training-pipeline")
def pipeline(
    project_id: str = PROJECT_ID,
    staging_bucket: str = BUCKET_NAME,
    training_package_file: str = TRAINING_PACKAGE_FILE,
    model_output_dir: str = MODEL_OUTPUT_DIR,
    model_name: str = MODEL_NAME,
    image_uri: str = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest",
    artifact_uri: str = MODEL_DIR,
): 
    start_msg = print_op("starting pipeline")
    model_trainig_op = gcc_aip.CustomPythonPackageTrainingJobRunOp(
        project = project_id,
        display_name = "model_training",
        container_uri = "us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-7:latest",
        staging_bucket = staging_bucket,
        python_package_gcs_uri = training_package_file,
        python_module_name = "trainer.task",
        base_output_dir = model_output_dir,
        )
    model_trainig_op.after(start_msg)
    model_upload_op = gcc_aip.ModelUploadOp(
        project = project_id,
        display_name = "model_upload",
        serving_container_image_uri = image_uri,
        artifact_uri = artifact_uri,
        )
    model_upload_op.after(model_trainig_op)
    endpoint_op = gcc_aip.EndpointCreateOp(
        project = project_id,
        location = "us-central1", 
        display_name = "custom-model-predict",
        )
    endpoint_op.after(model_upload_op)
    gcc_aip.ModelDeployOp(
        model=model_upload_op.outputs["model"],
        endpoint=endpoint_op.outputs["endpoint"],
        dedicated_resources_machine_type="n1-standard-2",
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=2,
    )

## 步骤三：编译Vertex AI Pipeline

在编写好Vertex AI Pipeline代码后可以对代码进行编译，编译后Vertex AI Pipeline会自动生成定义Pipeline的Json文件，用户可以利用该Json文件完成Pipeline的创建。

In [41]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="customer-training-pipeline.json".replace(" ", "_"),
)

## 步骤四：运行Vertex AI Pipeline 任务

Vertex AI Pipeline提供两种任务创建的方式：
- 进入GCP控制台，在Vertex AI Pipeline对应的界面，上传一步编译好的json文件，完成任务的创建。
- 调用Vertex AI Pipeline对应的sdk，选择上一步编译好的json文件，完成任务的创建。

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

DISPLAY_NAME = "custom-training-pipeline_" + TIMESTAMP

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="custom training pipeline.json".replace(" ", "-"),
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
)

job.run()

在Pipeline运行的过程中，可以到Pipeline的界面查看整个工作流具体执行的情况，同时也可以Vertex AI的Training Job、Model以及Endpoint节目去查看资源的创建状态。