# 任务 2：使用 SageMaker Processing 执行数据处理

在此笔记本中，您将使用 Amazon SageMaker Processing 设置运行基本 Apache Spark 应用程序所需的环境。通过在 SageMaker Processing 上使用 Apache Spark，您可以运行 Spark 作业，而无需预置 Amazon EMR 集群。然后，您可以使用 **SageMaker Python SDK** 中的 **PySparkProcessor** 类来定义并运行 Spark 作业。最后，您将验证保存在 Amazon Simple Storage Service (Amazon S3) 中的数据处理结果。

处理脚本会执行一些基本的数据处理，如字符串索引、独热编码、向量组装和以 80-20 分割率将处理过的数据分割成训练和验证数据集。    

## 任务 2.1：环境设置

安装最新的 SageMaker Python SDK 程序包和其他依赖项。

In [None]:
%%capture
%pip install awscli --upgrade
%pip install boto3 --upgrade
%pip install -U "sagemaker>2.0"

1.升级 SDK 后，通过从笔记本工具栏中选择 **Restart kernel**（重启内核）图标来重启笔记本内核。


现在，导入所需的库，让执行角色运行 SageMaker 处理作业，并设置 Amazon S3 存储桶来存储 Spark 作业输出。


In [None]:
#install-dependencies
import logging
import boto3
import sagemaker
import pandas as pd
import numpy as np
from sagemaker.s3 import S3Downloader
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()

#Execution role to run the SageMaker Processing job
role = sagemaker.get_execution_role()
print("SageMaker Execution Role: ", role)

#S3 bucket to read the Spark processing script and writing processing job outputs
s3 = boto3.resource('s3')
for buckets in s3.buckets.all():
    if 'labdatabucket' in buckets.name:
        bucket = buckets.name
print("Bucket: ", bucket)

如果您遇到错误，请确保通过从笔记本工具栏中选择 **Restart kernel**（重启内核）图标重启了笔记本内核。然后，重新运行单元格。

## 任务 2.2：运行 SageMaker 处理作业

在此任务中，您将导入并查看预处理数据集。

In [None]:
#import-data
prefix = 'data/input'

S3Downloader.download(s3_uri=f"s3://{bucket}/{prefix}/spark_adult_data.csv", local_path= 'data/')

shape=pd.read_csv("data/spark_adult_data.csv", header=None)
shape.sample(5)

接着，创建 Sagemaker Spark PySparkProcessor 类来定义一个 spark 应用程序并将其作为处理作业运行。有关此类的更多信息，请参阅 [Sagemaker Spark PySparkProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.spark.processing.PySparkProcessor)。

要创建 PySparkProcessor 类，请配置以下参数：
- **base_job_name**：处理作业名称的前缀
- **framework_version**：SageMaker PySpark 版本
- **role**：SageMaker 执行角色
- **instance_count**：要运行处理作业的实例的数量
- **instance_type**：用于处理作业的 Amazon Elastic Compute Cloud (Amazon EC2) 实例的类型

In [None]:
#pyspark-processor
from sagemaker.spark.processing import PySparkProcessor

# create a PySparkProcessor
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-preprocessor",
    framework_version="3.1", # Spark version
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200
)

接着，您将使用 PySparkProcessor run 方法将 **pyspark_preprocessing.py** 脚本作为处理作业运行。有关此方法的更多信息，请参阅 [PySparkProcessor run 方法](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.spark.processing.PySparkProcessor.run)。在本实验中，您将对分类特征执行字符串索引和独热编码等数据转换。

要运行处理作业，请配置以下参数：
- **submit_app**：预处理脚本的路径 
- **outputs**：预处理脚本的输出路径（Amazon S3 输出位置）
- **arguments**：预处理脚本的命令行参数（如 Amazon S3 输入和输出位置）

完成处理作业大约需要 5 分钟。在作业运行时，您可以通过从文件浏览器打开 **pyspark_preprocessing.py** 文件来查看预处理脚本（已作为本实验的一部分进行预先配置）的源代码。

In [None]:
#processing-job
import os
from sagemaker.processing import ProcessingInput, ProcessingOutput

# Amazon S3 path prefix
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
input_raw_data_prefix = "data/input"
output_preprocessed_data_prefix = "data/output"
scripts_prefix = "scripts/smstudiofiles"
logs_prefix = "logs"

# Run the processing job
spark_processor.run(
    submit_app='s3://' + os.path.join(bucket, scripts_prefix, "pyspark_preprocessing.py"),
    outputs=[
        ProcessingOutput(output_name="train_data", 
                         source="/opt/ml/processing/train",
                         destination="s3://" + os.path.join(bucket, output_preprocessed_data_prefix, "train")),
        ProcessingOutput(output_name="validation_data", 
                         source="/opt/ml/processing/validation",
                         destination="s3://" + os.path.join(bucket, output_preprocessed_data_prefix, "validation")),
    ],
    arguments=[
        "--s3_input_bucket", bucket,
        "--s3_input_key_prefix", input_raw_data_prefix,
        "--s3_output_bucket", bucket,
        "--s3_output_key_prefix", output_preprocessed_data_prefix],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, logs_prefix),
    logs=True
)

print("Spark Processing Job Completed.")

## 任务 2.3：验证数据处理结果

通过查看训练和验证输出数据集的前五行，验证您运行的数据处理作业的输出。

In [None]:
#view-train-dataset
print("Top 5 rows from s3://{}/{}/train/".format(bucket, output_preprocessed_data_prefix))
!aws s3 cp --quiet s3://$bucket/$output_preprocessed_data_prefix/train/train_features.csv - | head -n5

In [None]:
#view-validation-dataset
print("Top 5 rows from s3://{}/{}/validation/".format(bucket, output_preprocessed_data_prefix))
!aws s3 cp --quiet s3://$bucket/$output_preprocessed_data_prefix/validation/validation_features.csv - | head -n5

### 总结

恭喜！ 您已借助 SageMaker Processing 成功使用 SageMaker Python SDK 创建了 Spark 处理作业并运行了该处理作业。

本实验的下一项任务侧重于使用 SageMaker Processing 和内置的 scikit-learn 容器执行数据处理。

### 清理

您已完成此笔记本。要进入本实验的下一部分，请执行以下操作：

- 关闭此笔记本文件。
- 返回至实验会话，继续执行**任务 3：使用 SageMaker Processing 和内置的 scikit-learn 容器执行数据处理**。