# 使用Amazon SageMaker编排作业、进行模型注册与持续部署

Amazon SageMaker为机器学习应用开发者和MLOps工程师提供了强大的能力，可以用来编排SageMaker作业、构建可复现的机器学习管道、部署用于实时低延迟推理或离线批量推理的定制化模型，并追踪产物的血缘关系。您可以通过一个简单的界面，遵循机器学习应用开发的安全与最佳实践范式，在部署和监控生产工作流、部署模型产物以及追踪产物血缘关系中，建立起完善的运维实践。

SageMaker Pipelines服务支持一种名为SageMaker机器学习管道领域特定语言（DSL）的规范，它是一种声明式的Json规范。这种DSL定义了一个由管道参数和SageMaker作业步骤组成的有向无环图（DAG）。SageMaker Python软件开发工具包（SDK）则使用工程师和科学家们都已熟悉的编程结构，极大地简化了这个管道DSL的生成过程。

SageMaker模型注册中心（Model Registry）是用来存储、版本化和管理已训练模型的地方。数据科学家和机器学习工程师可以从一个单一的模型注册中心里，比较不同模型版本、审批用于部署的模型，以及部署来自不同AWS账户的模型。SageMaker让客户能够正确地起步并遵循MLOps的最佳实践。客户能够通过一次API调用，就搭建起一个完整的端到端MLOps系统。

## SageMaker Pipelines

Amazon SageMaker Pipelines支持以下活动：

*   **管道 (Pipelines)** - 一个由步骤和条件组成的有向无环图，用于编排SageMaker作业和资源创建。
*   **处理作业步骤 (Processing Job steps)** - 在SageMaker上运行数据处理工作负载的一种简化的、托管式的体验，例如特征工程、数据验证、模型评估和模型解释。
*   **训练作业步骤 (Training Job steps)** - 一个通过从训练数据集中提供样本来“教导”模型进行预测的迭代过程。
*   **条件执行步骤 (Conditional step execution)** - 在管道中提供分支的条件性执行能力。
*   **注册模型 (Registering Models)** - 在模型注册中心创建一个模型包资源，该资源可用于在Amazon SageMaker中创建可部署的模型。
*   **创建模型步骤 (Creating Model steps)** - 创建一个模型，用于后续的转换步骤或作为端点发布。
*   **参数化管道执行 (Parameterized Pipeline executions)** - 允许管道的执行根据提供的参数而有所不同。
*   **转换作业步骤 (Transform Job steps)** - 一种批量转换功能，用于预处理数据集以消除干扰训练或推理的噪声或偏差，从大型数据集中获取推理结果，以及在您不需要持久性端点时运行推理。

## SageMaker模型构建项目模板的布局

这个模板为将您的SageMaker Pipeline开发带入生产环境提供了一个起点。

```
|-- codebuild-buildspec.yml
|-- CONTRIBUTING.md
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
|-- README.md
|-- sagemaker-pipelines-project.ipynb
|-- setup.cfg
|-- setup.py
|-- tests
|   `-- test_pipelines.py
`-- tox.ini
```

下面对其中一些产物进行了描述：
<br/><br/>
您的CodeBuild执行指令：
```
|-- codebuild-buildspec.yml
```
<br/><br/>
您的管道产物，其中包含一个定义了必需的`get_pipeline`方法的管道模块（该方法返回一个SageMaker管道的实例）、一个用于特征工程的预处理脚本，以及一个用于衡量管道所训练模型均方误差（Mean Squared Error）的模型评估脚本：

```
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py

```

对于管道所需的其他包含代码和/或产物的子文件夹，它们需要被`setup.py`文件正确地打包。例如，要打包一个`pipelines/source`文件夹：

*   在`source`文件夹内包含一个`__init__.py`文件。
*   将其添加到`setup.py`文件的`package_data`中，如下所示：

```
...
    packages=setuptools.find_packages(),
    include_package_data=True,
    package_data={"pipelines.my_pipeline.src": ["*.txt"]},
    python_requires=">=3.6",
    install_requires=required_packages,
    extras_require=extras,
...
```

<br/><br/>
用于获取管道定义json和运行管道的工具模块：

```
|-- pipelines
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
```
<br/><br/>
Python包的产物：
```
|-- setup.cfg
|-- setup.py
```
<br/><br/>
一个用于在您开发时测试管道的存根测试模块：
```
|-- tests
|   `-- test_pipelines.py
```
<br/><br/>
`tox`测试框架的配置：
```
`-- tox.ini
```

### 一个SageMaker管道

我们创建的管道遵循一个典型的机器学习应用模式：预处理、训练、评估，以及在模型质量达标时的条件化模型注册与发布。

![一个典型的机器学习应用管道](img/pipeline-full.png)

### 获取一些常量

我们从本地执行环境中获取一些常量。

In [2]:
import boto3
import sagemaker


region = boto3.Session().region_name
role = sagemaker.get_execution_role()
default_bucket = sagemaker.session.Session().default_bucket()

# Change these to reflect your project/business name or if you want to separate ModelPackageGroup/Pipeline from the rest of your team
model_package_group_name = f"IrisPackageGroup"
pipeline_name = f"IrisPipeline"

### 获取管道实例

在这里，我们从您的管道模块中获取管道实例，以便我们可以使用它。

In [3]:
from pipelines.iris.pipeline import get_pipeline


pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
    processing_instance_type="ml.m5.xlarge",
    training_instance_type="ml.m5.xlarge",
)

### 将管道提交到SageMaker并开始执行

让我们将我们的管道定义提交到工作流服务。传入的角色将被工作流服务用来创建步骤中定义的所有作业。

In [4]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-west-2:439162431295:pipeline/irispipeline',
 'ResponseMetadata': {'RequestId': 'fe58479a-c49a-409f-af3a-1d8f559a0adb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'fe58479a-c49a-409f-af3a-1d8f559a0adb',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '80',
   'date': 'Wed, 08 Feb 2023 07:09:02 GMT'},
  'RetryAttempts': 0}}

我们将启动管道，并接受所有默认参数。

在启动管道时，也可以向这些管道参数传入值，这将在稍后介绍。 

In [5]:
execution = pipeline.start(
    parameters = dict(
        MLflowTrackingURI="http://mlflow-lb-909049553.us-west-2.elb.amazonaws.com",
        MLflowExperimentName="sagemaker-mlflow-iris-test",
        MLflowModelName="sagemaker-mlflow-iris-model-test",
    )
)

### 管道操作：检查并等待管道执行

现在我们描述执行实例并列出执行中的步骤，以了解更多关于执行的信息。

In [6]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:439162431295:pipeline/irispipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:439162431295:pipeline/irispipeline/execution/ca2mwbsxtgim',
 'PipelineExecutionDisplayName': 'execution-1675836866755',
 'PipelineExecutionStatus': 'Succeeded',
 'PipelineExperimentConfig': {'ExperimentName': 'irispipeline',
  'TrialName': 'ca2mwbsxtgim'},
 'CreationTime': datetime.datetime(2023, 2, 8, 6, 14, 26, 659000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 2, 8, 6, 22, 12, 75000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-2:439162431295:user-profile/d-wovwnxollrhl/ml-learner',
  'UserProfileName': 'ml-learner',
  'DomainId': 'd-wovwnxollrhl'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-2:439162431295:user-profile/d-wovwnxollrhl/ml-learner',
  'UserProfileName': 'ml-learner',
  'DomainId': 'd-wovwnxollrhl'},
 'ResponseMetadata': {'RequestId': 'c0a24181-ee93-41b4-b5b1-d

我们可以通过在执行实例上调用`wait()`来等待执行完成：

In [None]:
execution.wait()

我们可以列出执行步骤来检查状态和产物：

In [7]:
execution.list_steps()

[{'StepName': 'IrisTuning',
  'StartTime': datetime.datetime(2023, 2, 8, 6, 18, 49, 6000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 2, 8, 6, 22, 11, 417000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TuningJob': {'Arn': 'arn:aws:sagemaker:us-west-2:439162431295:hyper-parameter-tuning-job/ca2mwbsxtgim-iristun-wcuaygoeh2'}}},
 {'StepName': 'PrepareIrisData',
  'StartTime': datetime.datetime(2023, 2, 8, 6, 14, 27, 666000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 2, 8, 6, 18, 48, 295000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:439162431295:processing-job/pipelines-ca2mwbsxtgim-prepareirisdata-nl0blpf8iu'}}}]

### 参数化的执行

我们可以通过指定不同的管道参数来运行额外的管道执行。`parameters`参数是一个字典，其键是参数名称，其值是用于覆盖默认值的原始值。

特别值得注意的是，根据模型的性能，我们可能希望启动另一次管道执行，但这次使用计算优化型的实例，并将模型审批状态自动设置为“已批准”（Approved）。这意味着由`RegisterModel`步骤生成的模型包版本将自动准备好通过CI/CD管道进行部署，例如使用SageMaker Projects。

In [None]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()

In [None]:
execution.list_steps()