##### Copyright 2021 The TensorFlow Authors.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# 适用于 Vertex Pipelines 的简单 TFX 流水线


<div class="devsite-table-wrapper"><table class="tfo-notebook-buttons" align="left">
<td><a target="_blank" href="https://tensorflow.google.cn/tfx/tutorials/tfx/gcp/vertex_pipelines_simple"><img src="https://tensorflow.google.cn/images/tf_logo_32px.png">在 TensorFlow.org上查看</a></td>
<td>     <a target="_blank" href="https://colab.research.google.com/github/tensorflow/docs-l10n/blob/master/site/zh-cn/tfx/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb"><img src="https://tensorflow.google.cn/images/colab_logo_32px.png">在 Google Colab 运行</a> </td>
<td><a target="_blank" href="https://github.com/tensorflow/docs-l10n/blob/master/site/zh-cn/tfx/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb"><img width="32px" src="https://tensorflow.google.cn/images/GitHub-Mark-32px.png">在 Github 上查看源代码</a></td>
<td><a href="https://storage.googleapis.com/tensorflow_docs/tfx/docs/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb"><img src="https://tensorflow.google.cn/images/download_logo_32px.png">下载笔记本</a></td>
<td><a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?q=download_url%3Dhttps://storage.googleapis.com/tensorflow_docs/docs-l10n/site/zh-cn/tfx/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb"><img src="https://tensorflow.google.cn/images/download_logo_32px.png">在 Google Cloud Vertex AI Workbench 中运行</a></td>
</table></div>


这个基于笔记本的教程将创建一个简单的 TFX 流水线并使用 Google Cloud Vertex Pipelines 运行它。此笔记本基于我们在[简单 TFX 流水线教程](https://tensorflow.google.cn/tfx/tutorials/tfx/penguin_simple)中构建的 TFX 流水线。如果您不熟悉 TFX 并且尚未阅读该教程，则应在继续使用此笔记本之前阅读它。

Google Cloud Vertex Pipelines 以无服务器的方式编排您的 ML 工作流程，从而帮助您自动化、监控和管理 ML 系统。您可以配合使用 Python 和 TFX 来定义 ML 流水线，然后在 Google Cloud 上执行您的流水线。有关 Vertex Pipelines 的更多信息，请参阅 [Vertex Pipelines 介绍](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)。

此笔记本旨在于 [Google Colab](https://colab.research.google.com/notebooks/intro.ipynb) 或 [AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks) 上运行。如果您不使用其中之一，只需单击上面的“在 Google Colab 中运行”按钮即可。

## 设置

在运行此笔记本之前，请确保您具有以下内容：

- [Google Cloud Platform](http://cloud.google.com/) 项目。
- [Google Cloud Storage](https://cloud.google.com/storage) 存储桶。请参阅[存储桶创建指南](https://cloud.google.com/storage/docs/creating-buckets)。
- 启用 [Vertex AI 和 Cloud Storage API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,storage-component.googleapis.com)。

请参阅 [Vertex 文档](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project)进一步配置 GCP 项目。

### 安装 python 软件包

我们将安装所需的 Python 软件包（包括 TFX 和 KFP），以编写 ML 流水线并将作业提交到 Vertex Pipelines。

In [None]:
# Use the latest version of pip.
!pip install --upgrade pip
!pip install --upgrade "tfx[kfp]<2"

#### 是否已重新启动运行时？

如果您使用的是 Google Colab，则在首次运行上面的代码单元时必须重新启动运行时，方法是单击上面的“重新启动运行时”按钮或使用“运行时 &gt; 重新启动运行时...”菜单。这样做的原因是 Colab 加载软件包的方式。

如果您不在 Colab 上，可以使用以下代码单元重新启动运行时。

In [None]:
# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

### 登录 Google 获取此笔记本

如果您在 Colab 上运行此笔记本，请使用您的用户帐户进行身份验证：

In [None]:
import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

**如果您在 AI Platform Notebooks 上**，请在运行下一部分之前通过 Google Cloud 进行身份验证，方法是运行

```sh
gcloud auth login
```

运行位置为**终端窗口** （可通过菜单中的**文件** &gt; **新建**打开）。您只需对每个笔记本实例执行一次此操作。

检查软件包版本。

In [None]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

### 设置变量

我们将在下面设置一些用于自定义流水线的变量。所需信息如下：

- GCP 项目 ID。请参阅[找出项目 ID](https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects)。
- 用于运行流水线的 GCP 区域。有关 Vertex Pipelines 在其中可用的区域的更多信息，请参阅 [Vertex AI 位置指南](https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability)。
- 用于存储流水线输出的 Google Cloud Storage 存储桶。

**在下面的代码单元中输入所需值，然后再运行它**。


In [None]:
GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')

设置 `gcloud` 以使用您的项目。

In [None]:
!gcloud config set project {GOOGLE_CLOUD_PROJECT}

In [None]:
PIPELINE_NAME = 'penguin-vertex-pipelines'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for input data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# This is the path where your model will be pushed for serving.
SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

### 准备示例数据

我们将使用与[简单 TFX 流水线教程](https://allisonhorst.github.io/palmerpenguins/articles/intro.html)相同的 [Palmer Penguins 数据集](https://tensorflow.google.cn/tfx/tutorials/tfx/penguin_simple)。

此数据集中有四个数字特征，这些特征已标准化为具有范围 [0,1]。我们将建立一个预测企鹅 `species` 的分类模型。

我们需要创建我们自己的数据集副本。因为 TFX ExampleGen 从目录中读取输入，所以我们需要在 GCS 上创建一个目录并将数据集复制到其中。

In [None]:
!gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/

快速浏览一下 CSV 文件。

In [None]:
!gsutil cat {DATA_ROOT}/penguins_processed.csv | head

## 创建流水线

TFX 流水线是使用 Python API 定义的。我们将定义包含以下三个组件的流水线：CsvExampleGen、Trainer 和 Pusher。流水线和模型定义与[简单 TFX 流水线教程](https://tensorflow.google.cn/tfx/tutorials/tfx/penguin_simple)几乎相同。

唯一的区别是我们不需要设置用于定位 <a>ML Metadata</a> 数据库的 <code>metadata_connection_config</code>。因为 Vertex Pipelines 使用的是托管元数据服务，所以用户不必在意它，我们也无需指定参数。

在实际定义流水线之前，我们需要先为 Trainer 组件编写模型代码。

### 编写模型代码。

我们将使用与[简单 TFX 流水线教程](https://tensorflow.google.cn/tfx/tutorials/tfx/penguin_simple)中相同的模型代码。

In [None]:
_trainer_module_file = 'penguin_trainer.py'

In [None]:
%%writefile {_trainer_module_file}

# Copied from https://tensorflow.google.cn/tfx/tutorials/tfx/penguin_simple

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils


from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://tensorflow.google.cn/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

将模块文件复制到可从流水线组件进行访问的 GCS。因为模型训练发生在 GCP 上，所以我们需要上传此模型定义。

否则，您可能需要构建包含模块文件的容器映像并使用该映像来运行流水线。

In [None]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/

### 编写流水线定义

我们将定义一个函数来创建 TFX 流水线。

In [None]:
# Copied from https://tensorflow.google.cn/tfx/tutorials/tfx/penguin_simple and
# slightly modified because we don't need `metadata_path` argument.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     ) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

## 在 Vertex Pipelines 上运行流水线。

我们使用<a>简单 TFX 流水线教程</a>中的 <code>LocalDagRunner</code>，其在本地环境下运行。TFX 提供了多个编排器来运行流水线。在本教程中，我们将使用 Vertex Pipelines 和 Kubeflow V2 dag 运行程序。

我们需要定义一个运行程序来实际运行流水线。您将使用 TFX API 将您的流水线编译为我们的流水线定义格式。

In [None]:
# docs_infra: no_execute
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        serving_model_dir=SERVING_MODEL_DIR))

生成的定义文件可以使用 kfp 客户端提交。

In [None]:
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

现在，您可以访问上面输出中的链接或访问 [Google Cloud Console](https://console.cloud.google.com/) 中的“Vertex AI &gt; 流水线”来查看进度。