# Practical Data Science with Amazon SageMaker
_**使用 Amazon SageMaker 来解决端到端机器学习问题**_

---

学习使用 Amazon SageMaker 创建、优化和部署机器学习 (ML) 模型，以预测移动电话服务提供商的客户流失情况。

## 模块

1. [准备用于训练的数据集](#Prepare-a-dataset-for-training)
2. [训练和评估模型](#Train-and-evaluate-a-model)
3. [自动优化模型](#Automatically-tune-the-model)
4. [让模型做好生产准备](#Make-the-model-production-ready)
5. [AWS Auto Scaling](#AWS-Auto-Scaling)
6. [错误的相对成本](#Relative-cost-of-errors)
  
---

## 准备用于训练的数据集

### 有关客户流失预测的问题

对于任何企业来说，失去客户的代价都比较高昂。通过尽早发现不满意的客户，您便有机会采取激励措施来留住他们。您将使用 ML 来自动识别不满意的客户，此操作也称为*客户流失预测*。

ML 模型很少能提供完美的预测结果，因此您将学习如何在不超过总 ML 成本的情况下针对预测错误进行调整。

您可能对这个客户流失示例很熟悉，即客户不再使用某家移动电话服务提供商的服务，而改为使用其竞争对手的服务。如果提供商知道某位客户正在考虑弃用其服务，便可以及时采取激励措施或者升级客户所用套餐，从而鼓励客户继续使用其服务。 

激励措施往往比失去和重新获得客户更具有成本效益。

### 数据集

移动电话服务提供商会保留有关曾弃用了一段时间但最终继续使用其服务的客户的历史记录。您可以使用这些数据，通过称为*训练*的流程为移动电话服务提供商的客户流失情况构建 ML 模型。

您将训练该模型，还可以将任意客户的资料信息传递给该模型，并让其预测该客户是否会流失。模型会出错，预测未来这件事本身就比较棘手。但是，您将学习如何管理预测错误。

所使用的数据集是公开的，Daniel T. Larose 所著的《[*Discovering Knowledge in Data*](https://www.amazon.com/dp/0470908742/)》一书中提到了该数据集。作者称该数据集来自加利福尼亚大学尔湾分校的机器学习数据集存储库。

作为实验设置的一部分，相关数据集 churn.txt 已下载好并可供本练习使用。

In [None]:
!pip3 install --upgrade pandas

import pandas as pd
pd.__version__

环境准备工作，安装必须的升级包，确保使用能够正常进行。
#### 请仔细阅读要求：
Make sure pandas version is set to 1.2.4 or later. If it is not the case, restart the kernel before going further

In [None]:
!head './churn.txt'

上述内容看起来像一个带标题行的 CSV 文件。

您将使用 pandas 库加载和显示此原始数据集。

生成的 `churn` 变量是 pandas [DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html)。

In [None]:
import pandas as pd
churn = pd.read_csv('./churn.txt')
churn

我们来深入了解一下。按照现代标准，这是一个相对较小的数据集，只有 3333 条记录。

每条记录使用 21 个属性来描述未知美国移动电话服务提供商的客户资料。

这些属性如下：

- `State`：客户居住的州（美国），用两个字母组成的缩写表示。例如，OH 或 NJ
- `Account Length`：此账户处于活跃状态的天数
- `Area Code`：相应客户电话号码的三位数区号
- `Phone`：七位数的电话号码
- `Int’l Plan`：客户是否使用了国际通话套餐：yes/no
- `VMail Plan`：客户是否启用了语音信箱功能：yes/no
- `VMail Message`：每月平均语音邮件数量
- `Day Mins`：一天中使用的总通话分钟数
- `Day Calls`：一天中拨打电话的总次数
- `Day Charge`：白天的通话费用
- `Eve Mins、Eve Calls、Eve Charge`：晚间打电话的费用
- `Night Mins`、`Night Calls`、`Night Charge`：夜间打电话的费用
- `Intl Mins`、`Intl Calls`、`Intl Charge`：国际通话费用
- `CustServ Calls`：给客户服务人员打电话的次数
- `Churn?`：客户是否会弃用服务：true/false

最后一个属性 `Churn?` 称为*目标属性*，即我们希望 ML 模型预测的属性。由于目标属性采用二进制，我们的模型将执行二元预测，也称为*二元分类*。

现在，您将探索数据以及属性之间的关系并使其实现可视化。

我们已经使用了 pandas。此外，我们还建议您使用 numpy 和/或 matplotlib 库。

In [None]:
import numpy as np
import matplotlib.pyplot as plt

第一步是用直方图来查看各个属性的值的分布情况，并计算出数值属性的汇总统计数据，如均值、最小值、最大值、标准差等。

对于类别变量，我们需要查看频率表。

虽然可以通过多种方法在 Python 中执行此操作，但我们将使用以下 pandas 函数：[`hist()`](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.hist.html#pandas.DataFrame.hist)、[`describe()`](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.describe.html#pandas.DataFrame.describe)、[`crosstab()`](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.crosstab.html) 以及 [`select_dtypes()`](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.select_dtypes.html)。

为了在 Jupyter 笔记本中显示直方图，我们还将使用 IPython 的 [`%matplotlib inline`](https://ipython.readthedocs.io/en/stable/interactive/plotting.html) 魔法函数。

In [None]:
# 显示每个分类特征的频率表和唯一值的数量
for column in churn.select_dtypes(include=['object']).columns:
    display(pd.crosstab(index=churn[column],
                        columns='% observations',
                        normalize='columns'))
    print("# of unique values {}".format(churn[column].nunique()))

# 显示汇总统计数据
display(churn.describe())

# 为每个数值特征构建直方图
%matplotlib inline
hist = churn.hist(bins=30, sharey=True, figsize=(10, 10))

我们可以立即看到：
* `State` 的分布看起来相当平均
* `Phone` 具有过多的唯一值，无法用于任何实际用途。解析出前缀可能会有一定的价值，但除非您有更多关于如何分配前缀的上下文，否则请避免使用它。
* 只有 14% 的客户流失了，因此存在一定的分类不平衡问题，但并不是很极端。
* 大多数数值特征的分布情况都出奇得好，其中许多数值特征呈现出钟形的高斯性。`VMail Message` 是个显著的例外（并且 `Area Code` 显示为一项特征，我们应该将其转换为非数字形式）。

### 练习 1

现在，我们已经展示了如何完成练习的第一部分，接下来，该您完成第二部分，看看每个特征与目标变量 `Churn?` 之间的关系。

您可以使用相同的 pandas 函数 crosstab() 和 hist() 在下面的单元格中输入代码来完成该练习。


In [None]:
for column in churn.select_dtypes(include=['object']).columns:
    if column != 'Churn?':
        display(pd.crosstab(
            index=churn[column], columns=churn['Churn?'], normalize='columns'))

for column in churn.select_dtypes(exclude=['object']).columns:
    print(column)
    hist = churn[[column, 'Churn?']].hist(by='Churn?', bins=30)
    plt.show()

根据您的分析，您的解决方案可能会显示不同的结果，但是您应该会发现流失的客户具有以下特点：
* 在地理位置方面的分布相当均匀
* 更有可能使用国际套餐
* 不太可能使用语音邮件套餐
* 在每天的分钟数方面呈现出某种双峰状态（高于或低于非流失客户的平均值）
* 给客户服务人员打了很多电话（这一点很有意义，因为我们预计遇到很多问题的客户更容易流失）

此外，我们发现流失客户在 `Day Mins` 和 `Day Charge` 等特征方面的分布情况非常相似。 

这不足为奇，因为通话分钟数与费用相关。

### 练习 2

探索属性之间的成对关系，以了解它们之间的关系。请使用 pandas 函数 [corr()](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.corr.html) 和 [scatter_matrix()](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.plotting.scatter_matrix.html) 来执行此操作。

在下面的单元格中输入您的解决方案。

In [None]:
display(churn.corr())
pd.plotting.scatter_matrix(churn, figsize=(20, 20))
plt.show()

您应该会发现有几项特征彼此之间在本质上具有 100% 的相关性。在一些机器学习算法中加入这些特征对可能会产生灾难性问题，而在另一些机器学习算法中，这么做只会产生轻微的冗余和偏差。

在为模型训练做准备的过程中，请先删除所发现的对我们的目的无用的列。

删除 `Phone` 和 `Area Code` 属性：

In [None]:
churn = churn.drop(['Phone', 'Area Code'], axis=1)

然后从每个高度相关的特征对中删除一个特征： 

* 从 `Day Charge` 与 `Day Mins` 特征对中删除 `Day Charge`
* 从 `Night Charge` 与 `Night Mins` 特征对中删除 `Night Charge`
* 从 `Intl Charge` 与 `Intl Mins` 特征对中删除 `Intl Charge`

In [None]:
churn = churn.drop(['Day Charge', 'Eve Charge',
                    'Night Charge', 'Intl Charge'], axis=1)

现在我们已经完成了对数据的所有更改，在进行训练之前，让我们最后看一下数据的具体情况。

In [None]:
churn.head()

## 训练和评估模型

现在，请确定要使用的算法。如上所述，似乎有些变量的高值和低值（但不是中间值）都可以预测客户是否会流失。为了在线性回归之类的算法中反映这一点，我们需要生成多项式（或*分桶*）项目。 

让我们尝试改用梯度提升树针对这个问题构建模型。 

Amazon SageMaker 提供了 XGBoost 容器，我们可以用它在托管的分布式设置中训练模型，然后将其作为实时预测终端节点进行托管。XGBoost 使用梯度提升树，这些树自然地考虑到了特征与目标变量之间的非线性关系，并反映了特征之间的复杂交互。

Amazon SageMaker XGBoost 可以基于 CSV 或 LibSVM 格式的数据进行训练。在本示例中，我们将继续使用 CSV 格式的数据。它应该满足以下条件：
* 在第一列中有预测器变量
* 没有标头行

首先将分类特征转换为数值特征。

In [None]:
model_data = pd.get_dummies(churn)
model_data = pd.concat([model_data['Churn?_True.'], model_data.drop(
    ['Churn?_False.', 'Churn?_True.'], axis=1)], axis=1)

现在，将数据分为训练、验证和测试数据集。这将有助于防止模型过度拟合，并让您能够根据尚未看到的数据测试模型的准确性。

In [None]:
train_data, validation_data, test_data = np.split(model_data.sample(
    frac=1, random_state=1729), [int(0.7 * len(model_data)), int(0.9 * len(model_data))])
train_data.to_csv('train.csv', header=False, index=False)
validation_data.to_csv('validation.csv', header=False, index=False)

将这些文件上传到 Amazon S3，其中的 SageMaker 需要使用这些文件。

boto 是用于调用 AWS API 的标准 Python 库。我们将对训练和模型数据使用相同的 S3 存储桶和前缀。 

它们应与笔记本实例、训练和托管资源位于同一区域内。

In [None]:
import os
import boto3
import sagemaker

sess = sagemaker.Session()

bucket = sess.default_bucket()
prefix = 'bootcamp-xgboost-churn'

boto3.Session().resource('s3').Bucket(bucket).Object(
    os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(
    prefix, 'validation/validation.csv')).upload_file('validation.csv')

## 训练

接下来，指定 XGBoost 算法容器的位置。

Amazon SageMaker 算法会打包为 Docker 镜像。这使您可以灵活地将几乎所有算法代码与 Amazon SageMaker 结合使用，而无需考虑实施语言、相关库、框架等。

In [None]:
# 设置 IAM 角色
from sagemaker import get_execution_role

role = get_execution_role()
# 获取 XGBoost Docker 镜像
from sagemaker.amazon.amazon_estimator import get_image_uri
container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='xgboost', version='latest')
display(container)

[SageMaker Python 软件开发工具包](https://sagemaker.readthedocs.io/en/latest/)提供了以下有关使用 Amazon SageMaker 的高级别抽象内容：

* 评估程序：封装有关 SageMaker 的训练。
* 模型：封装所构建的 ML 模型。
* 预测器：针对 SageMaker 终端节点使用 Python 数据类型提供实时推理和转换。
* 会话：提供了一系列使用 SageMaker 资源的方法。

先创建 [xgboost 评估程序](https://sagemaker.readthedocs.io/en/latest/estimators.html)。必需的参数如下：image_name、role、session、instance_type 以及 instance_count。

对于此培训作业，请使用：

* `image_name = container`
* `role=role`
* `sagemaker_session = sess`
* `train_instance_count = 1`
* `train_instance_type = ml.m5.xlarge` 

### 练习 3

完成评估程序定义。

In [None]:
# 创建 SageMaker 评估程序对象
import sagemaker
sess = sagemaker.Session()

xgb = sagemaker.estimator.Estimator(container,
                                    role,
                                    train_instance_count=1,
                                    train_instance_type='ml.m5.xlarge',
                                    output_path='s3://{}/{}/output'.format(
                                        bucket, prefix),
                                    sagemaker_session=sess)

ML 算法是根据其超参数进行配置和优化的，这改变了算法的运作方式。

[XGBoost 文档](https://xgboost.readthedocs.io/en/latest/parameter.html)中有对 XGBoost 超参数的描述。

在本示例中，XGBoost 所需的超参数如下：

* `objective` – 指定学习任务和相应的学习目标。请对二元分类任务使用 **binary:logistic**。 
* `num_round` – 控制轮数。系统将使用上一轮的输出来训练每一轮。轮数越多，对训练数据的拟合就越好，但计算成本也会越高，或者会导致过度拟合。

另外几个关键的超参数如下：

* `max_depth`：用于控制算法中每棵树可以构建的深度。树越深，就越能更好地拟合，但计算成本也越高，还可能会导致过度拟合。通常，在模型性能方面，您需要在较大数量的浅树和较小数量的深树之间做一些权衡。
* `subsample`：用于控制训练数据的采样。此方法有助于减少过度拟合，但如果设置得过低，也可能会让模型无数据可用。
* `eta`：用于控制每轮提升的激进程度。值越大，提升越保守。
* `gamma`：用于控制树生长的激进程度。值越大，模型越保守。

### 练习 4

使用 xgb.set_hyperparameters 来设置超参数。

In [None]:
# 设置超参数
xgb.set_hyperparameters(max_depth=5,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100)

然后，由于您使用 CSV 文件格式进行训练，请创建 s3_inputs，训练函数可以用它来指向 Amazon S3 中的文件。

In [None]:
# 配置数据输入
s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
s3_input_validation = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/validation/'.format(bucket, prefix), content_type='csv')

最后，您已做好训练准备。

要训练模型，请使用 fit() 函数。

In [None]:
# 训练模型
xgb.fit({'train': s3_input_train, 'validation': s3_input_validation})

## 托管

现在，请创建一个模型，然后使用 SageMaker [评估程序](https://sagemaker.readthedocs.io/en/latest/estimators.html)的部署 API 将其部署到托管终端节点。 

### 练习 5 
配置 `initial_instance_count = 1` 和 `instance_type = ml.m5.xlarge`

In [None]:
# 部署模型
Variant = 'AllTraffic' # 配置默认部署变体
xgb_predictor = xgb.deploy(initial_instance_count=1,
                           instance_type='ml.m5.xlarge')


### 评估

现在，请通过发出 http POST 请求使用模型进行实时预测。 

首先要设置串行器和解串器，以便将 `test_data` 数组传递给终端节点后面的模型。

In [None]:
xgb_predictor.serializer = sagemaker.serializers.CSVSerializer()

使用简单的函数来执行以下操作：

1. 循环访问测试数据集
2. 将其拆分为小批量的行 
3. 将这些小批量行转换为 CSV 字符串负载
4. 通过调用 XGBoost 终端节点来检索小批量预测结果
5. 收集预测结果，并将您的模型提供的 CSV 输出转化为 NumPy 数组

In [None]:
def predict(data, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in split_array:
        predictions = ','.join(
            [predictions, xgb_predictor.predict(array).decode('utf-8')])

    return np.fromstring(predictions[1:], sep=',')


predictions = predict(test_data.to_numpy()[:, 1:])

将实际值与预测值进行比较来评估 ML 模型的性能。

在本示例中，您将预测客户是流失了 (`1`) 还是没流失 (`0`)，这会生成一个简单的混淆矩阵。

In [None]:
pd.crosstab(index=test_data.iloc[:, 0], columns=np.round(
    predictions), rownames=['actual'], colnames=['predictions'])

_注意：由于算法中的随机因素，您的结果可能会略有不同。_

在 48 个流失客户中，模型正确地预测到了 39 个。（真阳性。） 对其中 4 位客户的预测不正确，即他们最终并未流失。（假阳性。） 

我们预测其中有 9 位客户不会流失，但他们最终却流失了。（假阴性。）

**重要提示：**由于上面的 `np.round()` 函数，我们使用的是简单的阈值（或截止值）– 0.5。 

我们从 `xgboost` 获取的预测结果显示为 0 到 1 之间的连续值，并强制采用我们开始时所说的二元分类。

## 自动优化模型

### XGBoost 的模型超参数优化

自动模型优化或*超参数优化*通过运行很多在数据集上测试一系列超参数的作业来查找模型的最佳版本。

您可以选择可优化超参数、每个超参数的值范围和目标指标。您可以从算法计算的指标中选择目标指标。

自动模型优化将搜索所选超参数，以找到有助于构建可优化目标指标的模型的值组合。

有关模型优化的更多信息，请参阅“[自动模型优化](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html)”。 

### 练习 6

HyperparameterTuner 对象通过评估程序来获取每项作业配置信息。

In [None]:
# 重新创建之前创建的评估程序
xgb = sagemaker.estimator.Estimator(container,
                                    role,
                                    train_instance_count=1,
                                    train_instance_type='ml.m5.xlarge',
                                    output_path='s3://{}/{}/output'.format(
                                        bucket, prefix),
                                    sagemaker_session=sess)

对于每个优化作业，您都可以配置一组静态超参数和可优化超参数的范围。 

使用 IntegerParameter、CategoricalParameter 和 ContinuousParameter 对象为 hyperparameter_ranges 变量分配值。（此变量属于字典数据类型。）

### 练习 7

设置作业的范围。

In [None]:
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter

xgb.set_hyperparameters(objective='binary:logistic',
                        num_round=10)
hyperparameter_ranges = {
    'eta': ContinuousParameter(0, 1),
    'min_child_weight': IntegerParameter(1, 10)
}

接下来，指定要优化的目标指标及其定义。该定义包括从训练作业的 CloudWatch 日志中提取该指标所需的正则表达式。

XGBoost 算法在训练过程中会计算出九个[指标](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html)，它们均可用作目标指标。

在本示例中，您只需指定指标名称，无需提供正则表达式。

### 练习 8

选择一个指标作为目标指标。选择一个适合二元分类的目标。

In [None]:
objective_metric_name='validation:error'

现在，我们将创建一个 HyperparameterTuner 对象，以向其传递以下内容：

* XGBoost 评估程序
* 超参数范围 
* 目标指标名称
* 优化任务配置

### 练习 9

创建 [HyperparameterTuner](https://sagemaker.readthedocs.io/en/latest/tuner.html) 对象。

必需的参数如下： 

* `estimator`
* `objective_metric_name`
* `hyperparameter_ranges`
* `objective_type`
* `max_jobs` 
* `max_parallel_jobs`


In [None]:
# 超参数优化作业
from sagemaker.tuner import HyperparameterTuner

tuner = HyperparameterTuner(estimator=xgb,
                            objective_metric_name=objective_metric_name,
                            hyperparameter_ranges=hyperparameter_ranges,
                            objective_type='Minimize',
                            max_jobs=9,
                            max_parallel_jobs=3)


#### 构建训练模型！

In [None]:
tuner.fit({'train': s3_input_train, 'validation': s3_input_validation})
tuner.wait()

使用超参数优化作业对模型进行优化后，请将新模型部署到之前创建的终端节点。


## 部署新的经过训练的模型

Amazon SageMaker 包含内置的 A/B 测试功能，可以帮助您测试模型并试验不同的版本以找到最佳模型。 

首先，请查看优化作业的结果。

要监控超参数优化作业的进度和完成情况，请查看 SageMaker 控制台的“超参数优化作业”部分。

此单元格将以编程方式查看超参数优化作业的状态。

In [None]:
from pprint import pprint
sage_client = sess.sagemaker_client
tuning_job_name = tuner.latest_tuning_job.job_name

# 运行此单元格以查看超参数优化作业的当前状态
tuning_job_result = sage_client.describe_hyper_parameter_tuning_job(
    HyperParameterTuningJobName=tuning_job_name)

status = tuning_job_result['HyperParameterTuningJobStatus']
if status != 'Completed':
    print("The tuning job has not been completed.")

job_count = tuning_job_result['TrainingJobStatusCounters']['Completed']
print("%d training jobs have completed\n" % job_count)

if tuning_job_result.get('BestTrainingJob', None):
    print("Best model found so far:")
    pprint(tuning_job_result['BestTrainingJob'])
else:
    print("No training jobs have reported results yet.")

训练作业完成后，请将最佳模型部署到终端节点。

In [None]:
# 使用最佳模型创建 SageMaker 模型
model_name = sess.create_model_from_job(training_job_name=tuner.best_training_job(),
                                        role=role)

在模型中创建新的 SageMaker 终端节点配置。

In [None]:
# 获取当前终端节点配置
endpoint = sage_client.describe_endpoint(EndpointName=xgb_predictor.endpoint)
endpoint_config = sage_client.describe_endpoint_config(
    EndpointConfigName=endpoint['EndpointConfigName'])

# 将当前部署权重更改为 0.5（我们会将 50% 的流量移至新模型）
current_model_config = endpoint_config['ProductionVariants'][0]
current_model_config['InitialVariantWeight'] = 0.5
current_model_config['VariantName'] = 'XGBoost'
Variant = 'TunedXGBoost'

tuned_model_config = {'ModelName': model_name,
                      'InitialInstanceCount': 1,
                      'InstanceType': 'ml.m5.xlarge',
                      'VariantName': Variant,
                      'InitialVariantWeight': 0.5}

# 创建新的终端节点配置
sage_client.create_endpoint_config(
    EndpointConfigName='AB-Config',
    ProductionVariants=[current_model_config,
                        tuned_model_config])

# 更新终端节点
sage_client.update_endpoint(
    EndpointName=endpoint['EndpointConfigName'],
    EndpointConfigName='AB-Config'
)
result = sess.wait_for_endpoint(endpoint['EndpointConfigName'])

在 SageMaker 控制台中，您将能够看到终端节点现在将 50% 的流量发送到旧模型，并将 50% 的流量发送到新模型。 

创建新的交叉表，并确保一切正常。

In [None]:
def predict(data, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in split_array:
        predictions = ','.join(
            [predictions, xgb_predictor.predict(array).decode('utf-8')])

    return np.fromstring(predictions[1:], sep=',')

predictions = predict(test_data.as_matrix()[:, 1:])

pd.crosstab(index=test_data.iloc[:, 0], columns=np.round(
    predictions), rownames=['actual'], colnames=['predictions'])

如果一切正常（没有出现错误），您就可以将所有流量发送到新模型。

In [None]:
sage_client.update_endpoint_weights_and_capacities(
    EndpointName=endpoint['EndpointConfigName'],
    DesiredWeightsAndCapacities=[
        {
            'VariantName': Variant,
            'DesiredWeight': 1
        },
        {
            'VariantName': 'XGBoost',
            'DesiredWeight': 0
        }
    ]
)
response = sess.wait_for_endpoint(endpoint['EndpointConfigName'])

现在为新模型创建一个新的交叉表。

In [None]:
def predict(data, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in split_array:
        predictions = ','.join(
            [predictions, xgb_predictor.predict(array).decode('utf-8')])

    return np.fromstring(predictions[1:], sep=',')

predictions = predict(test_data.as_matrix()[:, 1:])

pd.crosstab(index=test_data.iloc[:, 0], columns=np.round(
    predictions), rownames=['actual'], colnames=['predictions'])

## 让模型做好生产准备

### AWS Auto Scaling

当您的终端节点投入生产时，根据您的需求和预计流量，单个终端节点的吞吐量在负载较高的情况下可能会给用户带来不良体验。

在负载较高的情况下，终端节点的行为类似于 Web 服务器。当需要更多时间才能满足请求（终端节点的吞吐量在减少）时，终端节点可能会由于超时而开始发出错误，并且常规实例指标（例如 CPU 利用率）将达到最大利用率。

为了缓解这种情况，您可以预置更多实例来支持为模型提供服务的终端节点，但这不会动态地适应您正在接收的流量和负载的变化。

借助适用于 Amazon SageMaker 的 AWS Auto Scaling，您不必密切监控推理卷，也无需手动更改终端节点配置，只需配置 AWS Auto Scaling 使用的扩展策略即可。

Auto Scaling 会根据实际工作负载增加或减少实例的数量，该负载是使用策略中定义的 Amazon CloudWatch 指标和目标值确定的。

在为最后一个终端节点部署 AWS Auto Scaling 之前，要先监控该终端节点在负载下的行为，以了解要跟踪哪些指标。

创建两个帮助程序函数，通过发送带有一些示例数据的请求来负责对终端节点进行负载测试。

In [None]:
import botocore
import concurrent.futures
import requests
import time

NB_CONNECTIONS = 200

# 更新 SageMaker 运行时客户端所接受的池中的连接数
client = boto3.client('sagemaker-runtime',
                      config=botocore.client.Config(max_pool_connections=NB_CONNECTIONS))
sess.sagemaker_runtime_client = client

# 将用于预测的数据发送到终端节点
def send_data():
    return xgb_predictor.predict([i for i in range(66)])

# 将 nb_requests 发送到终端节点 nb_repetitions
def load_test_endpoint(nb_requests, nb_repetitions):
    for _ in range(nb_repetitions):
        out = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=NB_CONNECTIONS) as executor:
            future_submit = (executor.submit(send_data)
                             for _ in range(nb_requests))
            time1 = time.time()
            for future in concurrent.futures.as_completed(future_submit):
                try:
                    data = future.result()
                except Exception as exc:
                    data = str(type(exc))
                finally:
                    out.append(data)

                    print(str(len(out)), end="\r")

            time2 = time.time()

        # 打印 nb_requests 所用时间
        print(f'Took {time2-time1:.2f} s for {nb_requests} requests')

现在，使用帮助程序函数并批量发送 200 个请求。在这批请求中，有 180 个请求是重复的。平均而言，这需要不到一秒钟的时间。总运行时间将不超过 3 分钟。

In [None]:
load_test_endpoint(nb_requests=200, nb_repetitions=180)

使用 50000 个请求来测试您的终端节点，使其处于负载状态。

尽量使请求简短，并且 50000 个批量请求中只能有 1 个重复请求。

In [None]:
load_test_endpoint(nb_requests=50000, nb_repetitions=1)

现在，您有足够的数据点来衡量终端节点的性能，并且可以使用 CloudWatch 指标来展示其性能。

您可以探索适用于 SageMaker 终端节点的不同指标，并确保您可以查看由您执行的第一个负载测试生成的指标。

在 SageMaker 控制台中，导航到您的终端节点，然后访问监控部分中的链接。您也可以运行以下单元格并打开此链接：

In [None]:
from IPython.display import Markdown as md

link = 'https://' + boto3.Session().region_name + '.console.aws.amazon.com/cloudwatch/home?region=' + boto3.Session().region_name + \
    '#metricsV2:namespace=AWS/SageMaker;dimensions=EndpointName,VariantName;search=' + \
    xgb_predictor.endpoint

md("**Caution**: *Ctrl + click* the link to not loose the current tab with the notebook: [%s](%s)" % (link, link))

这样，您就可以密切监控终端节点的性能，使用在上面发现的一些指标在 Amazon CloudWatch 中创建控制面板，设置一小时内可处理的数据量，以及设置自动刷新（如果愿意）以避免手动刷新控制面板。

另外，我们还提供了一个 Amazon CloudWatch 控制面板模板，您可以使用 AWS CloudFormation 自动部署该模板。

请从当前文件夹中打开 `dashboard/template-cloudwatch.yaml`，下载该模板。转到 AWS CloudFormation，并选择<b> Create Stack（创建堆栈）</b>，然后通过选择已下载的模板来“将模板上传到 Amazon S3”。

使用 AWS CloudFormation 部署该模板时，您需要终端节点和变体名称。创建完毕后，您可以查看部署结果，以找到指向控制面板的直接 URL。

In [None]:
variant_name = Variant

print("Endpoint name: " + xgb_predictor.endpoint)
print("Variant name: " + variant_name)

请注意，在系统处理最后一批比较大的请求时，大多数指标都会上升。假设此终端节点定期为大量请求提供服务来确定客户是否有可能流失，并且我们想定义一个合理的值来维持所需的吞吐量。我们可以用每分钟请求数 (RPS) 来衡量此终端节点，并定义终端节点需要能够为最多 10000 个请求提供服务才能为所有客户提供答案，直到他们遇到延迟，甚至错误。

在下面，我们根据[负载测试指南](https://docs.aws.amazon.com/sagemaker/latest/dg/endpoint-scaling-loadtest.html)中的建议定义了一个简单的公式，以计算用于自动扩展的吞吐量值。

In [None]:
MAX_RPS = 10000 
SAFETY_FACTOR = .5

INVOCATIONS_PER_INSTANCE_THRESHOLD = (MAX_RPS * SAFETY_FACTOR)
print(INVOCATIONS_PER_INSTANCE_THRESHOLD)

请注意安全因素，这一点很重要，因为您需要考虑终端节点在执行负载测试时的行为可能有所不同的情况。此外，您还需要注意您无法控制的外部因素。

该公式的结果将用作每个实例的调用次数阈值，达到该阈值时将触发终端节点自动扩展并添加其他实例以支持终端节点。

要实现此目标，请使用 AWS Auto Scaling 注册一个可扩展目标，以定义要自动扩展的内容（例如，实例数量）以及最小和最大容量。 

建议您将最小和最大容量分别设置为 1 和 3。

### 练习 10

填写第一步的详细信息来设置 AWS Auto Scaling。您可以使用 Auto Scaling API 的 `register_scalable_target` [API 方法](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/application-autoscaling.html#ApplicationAutoScaling.Client.register_scalable_target)以及为 `ResourceId` 预先计算的 `resource_name`。

In [None]:
autoscaling_client = boto3.client('application-autoscaling')
resource_name = 'endpoint/' + xgb_predictor.endpoint + '/variant/' + variant_name

response = autoscaling_client.register_scalable_target(
    ServiceNamespace='sagemaker',
    ResourceId=resource_name,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=1,
    MaxCapacity=3
)

现在，终端节点的实例数量已注册为可扩展目标，可以自动扩展或缩减。

要启用此行为，请创建一个扩展策略，用于定义 AWS Auto Scaling 需要跟踪哪些指标以及根据什么值来进行扩展或缩减。

在本示例中，当每个实例的调用次数达到我们定义的阈值时，终端节点应该会进行扩展。

### 练习 11

在下面的代码中填写您要跟踪的指标和值。

您可以参阅有关 `put_scaling_policy` [API 方法](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/application-autoscaling.html#ApplicationAutoScaling.Client.put_scaling_policy)的更多信息，了解如何填写这些值。

In [None]:
response = autoscaling_client.put_scaling_policy(
    PolicyName='xgb-endpoint-throughput-threshold',
    ServiceNamespace='sagemaker',
    ResourceId=resource_name,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': INVOCATIONS_PER_INSTANCE_THRESHOLD,
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'
        },
        'ScaleOutCooldown': 200,
        'ScaleInCooldown': 200,
        'DisableScaleIn': False
    }
)

将终端节点再次置于高负载情况下，观察 AWS Auto Scaling 的激活情况。 

向终端节点再批量发送 50000 个请求，其中应有 10 个重复请求。

In [None]:
load_test_endpoint(nb_requests=50000, nb_repetitions=10)

通过查看之前创建的控制面板，您能否看到自动扩展功能是何时启用的？ 

每个实例的调用次数应该已经降至目标值以下，并且调用图现在将显示 2 行，分别针对 Invocations 和 InvocationsPerInstance。

这清楚地表明了该模型是由多个实例提供服务，并且负载有所降低。

如果您让上述单元格继续运行更长的时间，则可能会看到终端节点在缩减。它将恢复到单个实例的状况。

如果终端节点后面有多个实例，那么您的吞吐量平均要低于我们设定的目标。AWS Auto Scaling 已设置为每分钟跟踪 5000 个请求。如果有太多数据点少于 4000 个请求，则警报将触发自动扩展策略，此处就是这种情况。

在生产环境中，您将使用功能更强大的负载测试工具执行真正的负载测试，并确定终端节点在高负载情况下（直到它开始发出错误为止）可以维持的确切吞吐量（或其他指标）。

有关该主题的更多信息，请参阅[博客文章](https://aws.amazon.com/blogs/machine-learning/load-test-and-optimize-an-amazon-sagemaker-endpoint-using-automatic-scaling/)。 

### 错误的相对成本

#### 二元分类截止

现在让我们回到模型评估并选择一个截止值，以将模型预测转换为二元决策。

与留住*可能*流失的客户的成本相比，应对流失客户的成本要高一些，因此请考虑调整此截止值，以最大限度地减少高昂的假阴性情况。这可能会导致假阳性客户的数量有所增加，但真阳性客户的数量也应该会增加，同时假阴性客户的数量会有所减少。

请查看预测结果的连续值，以在此处大致了解一下。

In [None]:
plt.hist(predictions)
plt.show()

来自模型的连续值预测结果倾向于偏向 0 或 1，但有大量客户的值是在 0.1 到 0.9 之间。调整截止值会改变很多客户预测结果。 

将截止值为 0.5 的原始混淆矩阵与截止值为 0.3 的混淆矩阵进行比较。

In [None]:
display(pd.crosstab(index=test_data.iloc[:, 0], columns=np.round(
    predictions), rownames=['actual'], colnames=['predictions']))
display(pd.crosstab(index=test_data.iloc[:, 0], columns=np.where(
    predictions > 0.3, 1, 0), rownames=['actual'], colnames=['predictions']))

我们可以看到，将截止值从 0.5 改为 0.3 导致真阴性客户的数量减少两个，真阳性客户的数量增加七个，假阳性客户的数量增加两个，以及假阴性客户的数量减少七个。 

这些数字可能看起来很小，但是总体而言，由于截止值的更改，有 5% 的客户的预测结果出现了变化。 

这一决定是否正确？  

虽然您最终可能需要留住的客户增加了七个，但需要激励的客户少了两个，因为即使您没有采取激励措施，这两个客户也会留下来。 

确定最佳截止值是在现实环境中正确应用机器学习的关键一步。

#### 使输出实现标准化

现在，让我们定义一个帮助程序函数，除了用于计算上面的混淆矩阵之外，还可以对输出进行“标准化”处理。 

现在，您可以查看百分比，而不是绝对计数。

In [None]:
def confusion_matrix(test_data, predictions, cutoff):
    conf_matrix = pd.crosstab(index=test_data.iloc[:, 0], columns=np.where(
        predictions > cutoff, 1, 0), rownames=['actual'], colnames=['predictions'])
    conf_matrix_normalized = conf_matrix/conf_matrix.values.sum()
    return conf_matrix_normalized

confusion_matrix(test_data, predictions, 0.5)

#### 考虑错误的相对成本

任何实际的二元分类问题都可能产生类似的敏感截止值。这本身并不是问题。毕竟，如果两个分类的分数非常很容易分开的话，这个问题可能一开始就不是很难。您甚至可以使用简单的规则（而不使用 ML）来解决该问题。

更重要的是，如果您把 ML 模型投入生产，*您将不得不在假阳性客户和假阴性客户上投入成本。*

请考虑与真阳性和真阴性的正确预测结果相关的类似成本。截止值的选择会影响到所有这四种统计数据。请注意，每项预测的这四个结果中的每一个都会给企业带来相对成本。

#### 分配成本

移动电话服务提供商的客户流失成本取决于企业采取的具体措施。

我们在这里做一些假设。

首先，为真阴性客户分配 0 USD 的成本。在本示例中，您的模型实质上可以正确识别出满意的客户，因此您无需采取任何措施。

假阴性结果最成问题，因为这种预测结果错误地预测到客户会留下来，而事实上他们却流失了。您失去了这些客户，还必须支付赢得替代客户所需的所有成本。

这些成本包括：

* 原本可以获得的收入
* 广告成本
* 管理成本
* 销售点成本
* 电话硬件补贴

在互联网上快速搜索一番后，您会发现这些成本通常为数百美元。因此，在本示例中，让我们假设这些成本为 500 USD。这是假阴性结果的成本。

最后，对于被您的模型认定为会流失的客户，让我们假设客户维系激励措施的成本为  
100 USD。如果我的服务提供商向我提供了这样的优惠，那么我肯定会三思而后行。这是真阳性和假阳性结果的成本。

如果出现假阳性（客户很满意，但模型错误地预测该客户会流失），我们的 100 USD 便“浪费”了。我们或许可以更有效地花费这 100 USD，但有可能提高了已经很忠诚的客户的忠诚度，所以结果还不错。

#### 找到最佳截止值

很显然，假阴性结果的成本比假阳性结果的成本高得多。我们不能根据客户数量针对错误进行优化，而应该最大限度地减小成本函数，如下所示：

```txt
$500 * FN(C) + $0 * TN(C) + $100 * FP(C) + $100 * TP(C)
```

FN(C) 表示假阴性百分比是截止值 C 的函数，并且对于 TN、FP 和 TP 都是相似的。您需要找到截止值 C，此时表达式的结果最小。

一种简单的方法是对大量可能的截止值运行模拟。 

下面，我们将在 for 循环中测试 100 个可能的值。

In [None]:
COST_OF_ERRORS = np.array([[0, 100], [500, 100]])

cutoffs = np.arange(0.01, 1, 0.01)
costs = []
for c in cutoffs:
    cost = (COST_OF_ERRORS * confusion_matrix(test_data,
                                              predictions, c)).values.sum()
    costs.append(cost)

costs = np.array(costs)
plt.plot(cutoffs, costs)
plt.show()
print('Cost is minimized near a cutoff of:',
      cutoffs[np.argmin(costs)], 'for a cost of:', np.min(costs))

上图显示，如果您设置的阈值过低，所有客户均将获得客户维系激励优惠，因此您的成本会直线上升。

同时，将阈值设置得太高会导致过多的客户流失，因此最终损失几乎相同。

通过将截止值设置为 0.29，可以将总成本降至最低 24.55 USD。

与根本不使用 ML 模型相比，这又有何不同？ 如果不使用 ML 模型，您不用采取任何激励措施，但同时会失去将会流失的所有客户。

假设相应的混淆矩阵如下所示：

In [None]:
no_ml_confusion_matrix = np.array([[0.855086, 0], [1-0.855086, 0]])

display((COST_OF_ERRORS * no_ml_confusion_matrix).sum())

在不使用 ML 模型且维持相同客户群的情况下，每位客户的成本要高得多：72.46 USD 与 24.55 USD


## 扩展

本笔记本向您展示了如何构建模型来预测客户是否有可能流失，以及如何设置考虑了真阳性，假阳性和假阴性结果成本的最佳阈值。 

您可以通过以下几种方式对其进行扩展，包括：

* 一些获得了客户维系激励优惠的客户仍会流失。在您的成本函数中包含已获得激励优惠但仍流失的可能性将为我们的客户维系计划提供更好的投资回报率。
* 切换到价格更低的套餐或者停用付费功能的客户代表不同类型的流失，您可以分别为其构建模型。
* 为客户行为的演变情况构建模型。如果使用率在下降，并且给客户服务人员打电话的次数在增加，在趋势相反时，您的客户更有可能会流失。客户资料应包含行为趋势。
* 实际的训练数据和货币成本分配可能更加复杂。
* 对于每种类型的客户流失，您可能需要多个模型。

不管额外的复杂性如何，本笔记本中描述的相似原则都可能适用。

### 清理

如果您已经使用完此笔记本，请运行以下单元格来删除托管终端节点，以免未关闭的闲置实例产生任何费用。

In [None]:
sagemaker.Session().delete_endpoint(xgb_predictor.endpoint)