## Task 2: Perform data processing with SageMaker Processing

In this notebook, you set up the environment needed to run a basic Apache Spark application using Amazon SageMaker Processing. By using Apache Spark on SageMaker Processing, you can run Spark jobs without having to provision an Amazon EMR cluster. You then define and run a Spark job using the **PySparkProcessor** class from the **SageMaker Python SDK**. Finally, you validate the data processing results saved in Amazon Simple Storage Service (Amazon S3).

The processing script does some basic data processing, such as string indexing, one-hot encoding, vector assembly, and an 80-20 split of the processed data to train and validate datasets.

### Task 2.1: Setup the environment

Install the latest SageMaker Python SDK package and other dependencies.

In [None]:
%%capture
%pip install awscli --upgrade
%pip install boto3 --upgrade
%pip install -U "sagemaker>2.0"

After upgrading the SDK, restart your notebook kernel. 

1. Choose the **Restart kernel** icon from the notebook toolbar.


Now, import the required libraries, get the execution role to run the SageMaker processing job, and set up the Amazon S3 bucket to store the Spark job outputs.


In [None]:
#install-dependencies
import logging
import boto3
import sagemaker
import pandas as pd
from sagemaker.s3 import S3Downloader
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

#Execution role to run the SageMaker Processing job
role = sagemaker.get_execution_role()
print("SageMaker Execution Role: ", role)

#S3 bucket to read the Spark processing script and writing processing job outputs
s3 = boto3.resource('s3')
for buckets in s3.buckets.all():
    if 'labdatabucket' in buckets.name:
        bucket = buckets.name
print("Bucket: ", bucket)

<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **Note:** If you get an error, make sure you restarted your notebook kernel by selecting the **Restart kernel** icon from the notebook toolbar. Then, rerun the cell. However, you may ignore any warnings from the above cell. 

### Task 2.2: Run the SageMaker processing job

In this task, you import and review the preprocessed dataset.

In [None]:
#import-data
prefix = 'data/input'

S3Downloader.download(s3_uri=f"s3://{bucket}/{prefix}/spark_adult_data.csv", local_path= 'data/')

shape=pd.read_csv("data/spark_adult_data.csv", header=None)
shape.sample(5)

Next, create the SageMaker Spark PySparkProcessor class to define and run a spark application as a processing job. Refer to [SageMaker Spark PySparkProcessor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.spark.processing.PySparkProcessor) for more information about this class.

For creating the PySparkProcessor class, you configure the following parameters:
- **base_job_name**: Prefix for the processing job name
- **framework_version**: SageMaker PySpark version
- **role**: SageMaker execution role
- **instance_count**: Number of instances to run the processing job
- **instance_type**: Type of Amazon Elastic Compute Cloud (Amazon EC2) instance used for the processing job

In [None]:
#pyspark-processor
from sagemaker.spark.processing import PySparkProcessor

# create a PySparkProcessor
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-preprocessor",
    framework_version="3.1", # Spark version
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200
)

Next, you use the PySparkProcessor run method to run the **pyspark_preprocessing.py** script as a processing job. Refer to [PySparkProcessor run method](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html#sagemaker.spark.processing.PySparkProcessor.run) for more information about this method. For this lab, data transformations such as string indexing and one-hot encoding are performed on the categorical features.

For running the processing job, you configure the following parameters:
- **submit_app**: Path of the preprocessing script 
- **outputs**: Path of output for the preprocessing script (Amazon S3 output locations)
- **arguments**: Command-line arguments to the preprocessing script (such as the Amazon S3 input and output locations)

The processing job takes approximately 5 minutes to complete. While the job is running, you can review the source for the preprocessing script (which has been preconfigured as part of this lab) by opening the **pyspark_preprocessing.py** file from the file browser.

In [None]:
#processing-job
import os
from sagemaker.processing import ProcessingOutput

# Amazon S3 path prefix
input_raw_data_prefix = "data/input"
output_preprocessed_data_prefix = "data/output"
logs_prefix = "logs"

# Run the processing job
spark_processor.run(
    submit_app="pyspark_preprocessing.py",
    outputs=[
        ProcessingOutput(output_name="train_data", 
                         source="/opt/ml/processing/train",
                         destination="s3://" + os.path.join(bucket, output_preprocessed_data_prefix, "train")),
        ProcessingOutput(output_name="validation_data", 
                         source="/opt/ml/processing/validation",
                         destination="s3://" + os.path.join(bucket, output_preprocessed_data_prefix, "validation")),
    ],
    arguments=[
        "--s3_input_bucket", bucket,
        "--s3_input_key_prefix", input_raw_data_prefix,
        "--s3_output_bucket", bucket,
        "--s3_output_key_prefix", output_preprocessed_data_prefix],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, logs_prefix),
    logs=True
)

print("Spark Processing Job Completed.")

<i aria-hidden="true" class="fas fa-sticky-note" style="color:#563377"></i> **Note:** While the processing job is running, you can also monitor the job progress from the SageMaker AI console. To monitor the processing job: 

1. Navigate to the SageMaker AI console. 

2. From the left pane, select **Prossecing**, then **Processing jobs**. 

3. The processing job name starts with **sm-spark-preprocessor-**. 

4. After the processing job completes, return to this notebook. 

### Task 2.3: Validate the data processing results

Validate the output of the data processing job that you ran by reviewing the first five rows of the train and validation output datasets.

In [None]:
#view-train-dataset
print("Top 5 rows from s3://{}/{}/train/".format(bucket, output_preprocessed_data_prefix))
!aws s3 cp --quiet s3://$bucket/$output_preprocessed_data_prefix/train/train_features.csv - | head -n5

In [None]:
#view-validation-dataset
print("Top 5 rows from s3://{}/{}/validation/".format(bucket, output_preprocessed_data_prefix))
!aws s3 cp --quiet s3://$bucket/$output_preprocessed_data_prefix/validation/validation_features.csv - | head -n5

### Conclusion

Congratulations! You have used SageMaker Processing to successfully create a Spark processing job using the SageMaker Python SDK and run a processing job.

### Cleanup

You have completed this notebook. To move to the next part of the lab, do the following:

- Close this notebook file.
- Return to the lab session and continue with the **Conclusion**.