# Feature Transformation with a Amazon SageMaker Processing Job and SparkML

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge.

Often, distributed data processing frameworks such as Spark are used to pre-process data sets in order to prepare them for training. In this notebook we'll use Amazon SageMaker Processing, and leverage the power of Spark in a managed SageMaker environment to run our processing workload.

![](img/prepare_dataset.png)

![](img/processing.jpg)


## Contents

1. Setup
1. Use Amazon SageMaker Processing Job to execute a Spark ML Transformation
1. Setup Input Data
1. Setup Output Data
1. Build a Spark container for running the processing job
1. Run the Processing Job using Amazon SageMaker
1. Inspect the Processed Output Dta

## Setup

Add the following policies to your SageMaker Execution Role:  
* `EC2ContainerRegistry`
* Permissions: `List`, `Read`, `Write` 
* Repository:  `amazon-reviews-spark-processor`

Let's start by specifying:
* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.
* The IAM role ARN used to give processing and training access to the dataset.

In [None]:
import sagemaker
import boto3

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

# Using Amazon SageMaker Processing Job to Run a Spark ML Transformation

Get the name of previous Scikit Processing Job to use to construct the input for our Spark ML Processing Job.

# Setup Input Data

In [None]:
%store -r scikit_processing_job_s3_output_prefix

In [None]:
print('Previous Scikit Processing Job Name: {}'.format(scikit_processing_job_s3_output_prefix))

In [None]:
!aws s3 ls s3://$bucket/$scikit_processing_job_name/output/

In [None]:
balanced_train_data_input = 's3://{}/{}/output/raw-labeled-split-balanced-header-train/'.format(bucket, scikit_processing_job_s3_output_prefix)
balanced_validation_data_input = 's3://{}/{}/output/raw-labeled-split-balanced-header-validation/'.format(bucket, scikit_processing_job_s3_output_prefix)
balanced_test_data_input = 's3://{}/{}/output/raw-labeled-split-balanced-header-test/'.format(bucket, scikit_processing_job_s3_output_prefix)

In [None]:
!aws s3 ls $balanced_train_data_input

In [None]:
!aws s3 ls $balanced_validation_data_input

In [None]:
!aws s3 ls $balanced_test_data_input

# Setup Output Data

In [None]:
from time import gmtime, strftime
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

output_prefix = 'amazon-reviews-spark-processor-{}'.format(timestamp_prefix)

In [None]:
balanced_train_data_tfidf_output = 's3://{}/{}/output/tfidf-labeled-split-balanced-noheader-train'.format(bucket, output_prefix)
balanced_validation_data_tfidf_output = 's3://{}/{}/output/tfidf-labeled-split-balanced-noheader-validation'.format(bucket, output_prefix)
balanced_test_data_tfidf_output = 's3://{}/{}/output/tfidf-labeled-split-balanced-noheader-test'.format(bucket, output_prefix)

print(balanced_train_data_tfidf_output)
print(balanced_validation_data_tfidf_output)
print(balanced_test_data_tfidf_output)

# Build a Spark Docker Image to Run the Processing Job

An example Spark container is included in the `./container` directory of this example. The container handles the bootstrapping of all Spark configuration, and serves as a wrapper around the `spark-submit` CLI. At a high level the container provides:
* A set of default Spark/YARN/Hadoop configurations
* A bootstrapping script for configuring and starting up Spark master/worker nodes
* A wrapper around the `spark-submit` CLI to submit a Spark application


After the container build and push process is complete, use the Amazon SageMaker Python SDK to submit a managed, distributed Spark application that performs our dataset processing.

Build the example Spark container.

In [None]:
docker_repo = 'amazon-reviews-spark-processor'
docker_tag = 'latest'

In [None]:
!docker build -t $docker_repo:$docker_tag -f container/Dockerfile ./container

Create an Amazon Elastic Container Registry (Amazon ECR) repository for the Spark container and push the image.

In [None]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

image_uri = '{}.dkr.ecr.{}.amazonaws.com/{}:{}'.format(account_id, region, docker_repo, docker_tag)
print(image_uri)

Create ECR repository and push docker image

In [None]:
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)

In [None]:
!aws ecr describe-repositories --repository-names $docker_repo || aws ecr create-repository --repository-name $docker_repo

In [None]:
!docker tag $docker_repo:$docker_tag $image_uri

In [None]:
!docker push $image_uri

# Run the Job using Amazon SageMaker Processing Jobs

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the Spark container that was just built, and a SparkML script for processing in the job configuration.

Review the Spark processing script.

In [None]:
cat preprocess-spark-text-to-tfidf-depends-on-scikit.py

In [None]:
from sagemaker.processing import ScriptProcessor

processor = ScriptProcessor(base_job_name='spark-amazon-reviews-processor',
                            image_uri=image_uri,
                            command=['/opt/program/submit'],
                            role=role,
                            instance_count=2, # instance_count needs to be > 1 or you will see the following error:  "INFO yarn.Client: Application report for application_ (state: ACCEPTED)"
                            instance_type='ml.r5.8xlarge',
                            env={'mode': 'python'})

# Start the Spark Processing Job

_Notes on Invoking from Lambda:_
* However, if we use the boto3 SDK (ie. with a Lambda), we need to copy the `preprocess-text-to-tfidf.py` file to S3 and specify the everything include --py-files, etc.
* We would need to do the following before invoking the Lambda:
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/code/preprocess.py
     !aws s3 cp preprocess.py s3://<location>/sagemaker/spark-preprocess-reviews-demo/py_files/preprocess.py
* Then reference the s3://<location> above in the --py-files, etc.
* See Lambda example code in this same project for more details.

_Notes on not using ProcessingInput and Output:_
* Since Spark natively reads/writes from/to S3 using s3a://, we can avoid the copy required by ProcessingInput and ProcessingOutput (FullyReplicated or ShardedByS3Key) and just specify the S3 input and output buckets/prefixes._"
* See https://github.com/awslabs/amazon-sagemaker-examples/issues/994 for issues related to using /opt/ml/processing/input/ and output/
* If we use ProcessingInput, the data will be copied to each node (which we don't want in this case since Spark already handles this)

In [None]:
from sagemaker.processing import ProcessingOutput

processor.run(code='preprocess-spark-text-to-tfidf-depends-on-scikit.py',
              arguments=['s3_input_balanced_train_data', balanced_train_data_input,
                         's3_input_balanced_validation_data', balanced_validation_data_input,
                         's3_input_balanced_test_data', balanced_test_data_input,
                         's3_output_balanced_train_data', balanced_train_data_tfidf_output,
                         's3_output_balanced_validation_data', balanced_validation_data_tfidf_output,
                         's3_output_balanced_test_data', balanced_test_data_tfidf_output,                         
              ],
              # We need this dummy output to allow us to call 
              #    ProcessingJob.from_processing_name() later 
              #    to describe the job and poll for Completed status
              outputs=[
                       ProcessingOutput(s3_upload_mode='EndOfJob',
                                        output_name='dummy-output',
                                        source='/opt/ml/processing/output')
              ],          
              logs=True,
              wait=False
)

In [None]:
from IPython.core.display import display, HTML

spark_processing_job_name = processor.jobs[-1].describe()['ProcessingJobName']

display(HTML('<b>Review <a href="https://console.aws.amazon.com/cloudwatch/home?region={}#logStream:group=/aws/sagemaker/ProcessingJobs;prefix={};streamFilter=typeLogStreamPrefix">CloudWatch Logs</a> After About 5 Minutes</b>'.format(region, spark_processing_job_name)))


In [None]:
from IPython.core.display import display, HTML

# This is different than the job name because we are not using ProcessingOutput's in this Spark ML case.
spark_processing_job_s3_output_prefix = output_prefix

display(HTML('<b>Review <a href="https://s3.console.aws.amazon.com/s3/buckets/{}/{}/?region={}&tab=overview">S3 Output Data</a> After The Spark Job Has Completed</b>'.format(bucket, spark_processing_job_s3_output_prefix, region)))


# Please Wait Until the Processing Job Completes
Re-run this next cell until the job status shows `Completed`.

In [None]:
running_processor = sagemaker.processing.ProcessingJob.from_processing_name(processing_job_name=spark_processing_job_name,
                                                                            sagemaker_session=sagemaker_session)

processing_job_description = running_processor.describe()

processing_job_status = processing_job_description['ProcessingJobStatus']
print('\n')
print(processing_job_status)
print('\n')

print(processing_job_description)

# Inspect the Processed Output Dataset


## The next cells will not work properly until the job completes above.


Take a look at a few rows of the transformed dataset to make sure the processing was successful.

In [None]:
!aws s3 ls --recursive $balanced_train_data_tfidf_output/

In [None]:
!aws s3 ls --recursive $balanced_validation_data_tfidf_output/

In [None]:
!aws s3 ls --recursive $balanced_test_data_tfidf_output/

# Pass `spark_processing_job_s3_output_prefix` as input to the next notebook

In [None]:
print(spark_processing_job_s3_output_prefix)

In [None]:
%store spark_processing_job_s3_output_prefix
