# 创建训练流水线（Python 模块版本）

## 导语

此笔记旨在创建一个训练流水线，以便我们可以训练一个模型。

In [1]:
HOST = "https://aip-apis.nadileaf.com"
PROJECT_ID = "tokyo-rain-42"
LOCATION = "default"

## 编写训练代码

本实例中，我们使用 `/tmp/aip` 作为项目的根目录，并在其中创建一个叫 `create-training-pipeline` 的模块。
模块内部包含一个 `aip_task.py` 文件，作为我们的程序入口。
所以目录结构的示意如下：
```
.                            # 项目的根目录
├── ...
├── create-training-pipeline # 模块的根目录
│   ├── __init__.py          # 标记此文件夹是一个 Python 模块
│   └── aip_task.py          # 期望的程序入口
└── ...
```

所以我们先来创建项目和模块：

In [18]:
! mkdir -p /tmp/aip/create-training-pipeline
! touch /tmp/aip/create-training-pipeline/__init__.py

使用魔术命令 `%%writefile` 将训练代码写入 `/tmp/aip/create-train-pipeline/aip_task.py` 文件中。

In [19]:
%%writefile /tmp/aip/create-training-pipeline/aip_task.py

import numpy as np
from tensorflow import keras


def get_args():
    import os
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--model-dir",
        dest="model_dir",
        default=os.getenv("AIP_MODEL_DIR"),
        type=str,
        help="Model dir.",
    )
    return parser.parse_args()


args = get_args()


def get_model():
    # Create a simple model.
    inputs = keras.Input(shape=(32,))
    outputs = keras.layers.Dense(1)(inputs)
    model = keras.Model(inputs, outputs)
    model.compile(optimizer="adam", loss="mean_squared_error")
    return model


model = get_model()

# Train the model.
test_input = np.random.random((128, 32))
test_target = np.random.random((128, 1))
model.fit(test_input, test_target)


def s3_save_keras_model(model, model_name, version="1"):
    import re
    import tempfile
    import boto3
    from pathlib import Path

    with tempfile.TemporaryDirectory() as tempdir:
        print(f"Saving model to {tempdir}")

        local_model_dir = Path(tempdir, model_name, version)
        model.save(local_model_dir, save_format="tf")

        client = boto3.client("s3")
        bucket_name, key_prefix = re.match("s3://([^/]+)/(.*)", args.model_dir).groups()
        if not key_prefix.endswith("/"):
            key_prefix += "/"

        local_dir = Path(tempdir)
        for item in local_dir.rglob("*"):
            if not item.is_file():
                continue
            relative = item.relative_to(local_dir)
            key_suffix = str(relative)
            key = key_prefix + key_suffix
            print(f"Uploading {item} to {key}")
            client.upload_file(
                Filename=str(item.absolute()),
                Bucket=bucket_name,
                Key=key,
            )


# Calling `save('my_model')` creates a SavedModel folder `my_model`.
s3_save_keras_model(model, "naive_model")

Overwriting /tmp/aip/create-training-pipeline/aip_task.py


## 打包训练代码


In [20]:
! rm -f /tmp/aip/create-training-pipeline.tar.gz
! tar -zcvf /tmp/aip/create-training-pipeline.tar.gz -C /tmp/aip/ create-training-pipeline/
! aws s3 cp /tmp/aip/create-training-pipeline.tar.gz s3://aip-demo/usages/create-training-pipeline.tar.gz

a create-training-pipeline
a create-training-pipeline/aip_task.py
a create-training-pipeline/__init__.py
upload: ../../../../../../../../tmp/aip/create-training-pipeline.tar.gz to s3://aip-demo/usages/create-training-pipeline.tar.gz


## 发起「创建训练流水线」请求

我们使用 `IMAGE_URL` 作为容器镜像的 URL，并执行 `PYTHON_PACKAGE_URI` 包内的 `PYTHON_MODULE_NAME` 模块。

In [21]:
from datetime import datetime

MACHINE_TYPE = "TBD"
ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
ACCELERATOR_COUNT = 0  # number of CPUs

IMAGE_URL = "harbor.nadileaf.com/aip-images/create-training-image:0.1.1"
PYTHON_PACKAGE_URI = "s3://aip-demo/usages/create-training-pipeline.tar.gz"
PYTHON_MODULE_NAME = "create-training-pipeline.aip_task"

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = "create-train-pipeline_" + TIMESTAMP

DISPLAY_NAME

'create-train-pipeline_20220812062200'

In [44]:
from devtools import pprint
import requests
import json

url = f"{HOST}/v1/projects/{PROJECT_ID}/locations/{LOCATION}/trainingPipelines"

payload = json.dumps(
    {
        "displayName": DISPLAY_NAME,
        "trainingTaskDefinition": "s3://aip/schema/trainingjob/definition/custom_task_1.0.0.yaml",
        "trainingTaskInputs": {
            "workerPoolSpecs": [
                {
                    "machineSpec": {
                        "machineType": MACHINE_TYPE,
                        "acceleratorType": ACCELERATOR_TYPE,
                        "acceleratorCount": ACCELERATOR_COUNT,
                    },
                    "pythonPackageSpec": {
                        "executorImageUri": IMAGE_URL,
                        "packageUris": [PYTHON_PACKAGE_URI],
                        "pythonModule": PYTHON_MODULE_NAME,
                    },
                }
            ]
        },
        "modelToUpload": {
            "displayName": DISPLAY_NAME,
            "predictSchemata": {},
            "containerSpec": {
                "imageUri": "tensorflow/serving:latest",
                "ports": [{"containerPort": 8501}],
                "env": [
                    {"name": "MODEL_BASE_PATH", "value": "/app"},
                    {"name": "MODEL_NAME", "value": "naive_model"},
                ],
                "predictRoute": "/v1/models/naive_model:predict",
            },
        },
    }
)
headers = {"Content-Type": "application/json"}

response = requests.request("POST", url, headers=headers, data=payload)

pprint(response.json())

{
    'name': 'projects/tokyo-rain-42/locations/default/trainingPipelines/00b5f52e-a467-4932-a051-5fa7c0c15adb',
    'displayName': 'create-train-pipeline_20220812062200',
    'trainingTaskDefinition': 's3://aip/schema/trainingjob/definition/custom_task_1.0.0.yaml',
    'trainingTaskInputs': {
        'workerPoolSpecs': [
            {
                'pythonPackageSpec': {
                    'executorImageUri': 'harbor.nadileaf.com/aip-images/create-training-image:0.1.1',
                    'packageUris': [
                        's3://aip-demo/usages/create-training-pipeline.tar.gz',
                    ],
                    'pythonModule': 'create-training-pipeline.aip_task',
                },
            },
        ],
    },
    'modelToUpload': {
        'name': '',
        'displayName': 'create-train-pipeline_20220812062200',
        'description': '',
        'metadataSchemaUri': '',
        'supportedExportFormats': [],
        'trainingPipeline': 'projects/tokyo-rain-42/

## 获取「训练流水线」的「名字」

In [45]:
TRAINING_PIPELINE_NAME = response.json()["name"]
TRAINING_PIPELINE_NAME

'projects/tokyo-rain-42/locations/default/trainingPipelines/00b5f52e-a467-4932-a051-5fa7c0c15adb'

## 查询「训练流水线」的「状态」

直到 `state` 字段为 `PIPELINE_STATE_SUCCEEDED`，才能获取到模型的名字。

In [51]:
import requests
from devtools import pprint

url = f"{HOST}/v1/{TRAINING_PIPELINE_NAME}"

payload = {}
headers = {}

response = requests.request("GET", url, headers=headers, data=payload)

pprint(response.json()["state"])
pprint(response.json())

'PIPELINE_STATE_SUCCEEDED'
{
    'name': 'projects/tokyo-rain-42/locations/default/trainingPipelines/00b5f52e-a467-4932-a051-5fa7c0c15adb',
    'displayName': 'create-train-pipeline_20220812062200',
    'trainingTaskDefinition': 's3://aip/schema/trainingjob/definition/custom_task_1.0.0.yaml',
    'trainingTaskInputs': {
        'workerPoolSpecs': [
            {
                'pythonPackageSpec': {
                    'packageUris': [
                        's3://aip-demo/usages/create-training-pipeline.tar.gz',
                    ],
                    'pythonModule': 'create-training-pipeline.aip_task',
                    'executorImageUri': 'harbor.nadileaf.com/aip-images/create-training-image:0.1.1',
                },
            },
        ],
    },
    'modelToUpload': {
        'name': 'projects/tokyo-rain-42/locations/default/models/750e6e67-526c-44a3-b173-05ea95a9cc43',
        'displayName': 'create-train-pipeline_20220812062200',
        'description': '',
        'met

## 获取「模型」的「名字」

In [52]:
MODEL_NAME = response.json()["modelToUpload"]["name"]
MODEL_NAME

'projects/tokyo-rain-42/locations/default/models/750e6e67-526c-44a3-b173-05ea95a9cc43'