# Using Sagemaker Processing for post-processing aggregation

There are situations when you need to perform post-processing on your batch predictions. A good example of such use case could be to calculate aggregate statistics, e.g. in case of Named Entity Recoknition task calculate how many times specific tokens occur in prediction dataset. 

Amazon Sagemaker allows you to easily implement this type of scenarios using [Sagemaker Processing feature](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html). In this notebook, we'll explore how to calculate number of total occurences for specific tokens using Sagemaker Processing.

With Amazon SageMaker Processing jobs, you can leverage a simplified, managed experience to run data pre- or post-processing and model evaluation workloads on the Amazon SageMaker platform. A processing job downloads input from Amazon Simple Storage Service (Amazon S3), then uploads outputs to Amazon S3 during or after the processing job.


![alt text](./images/proc_1.jpg)

## Building custom processing container

For this exercise we will use a simple processing container built from scratch. This will give some idea how easy it is to port your existing processing code/pipeline and run it using Sagemaker capabilities.

Let's define parameteres of our customer container, then build and push it to your account ECR.

In [44]:
import boto3 
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()
sess = sagemaker.Session()

ACCOUNT_ID = sess.boto_session.client('sts').get_caller_identity()['Account']
CONTAINER_NAME="custom-processing"
TAG = "latest"
REGION = "us-east-2"

In [45]:
# login to your private ECR
!aws ecr get-login-password --region us-east-2 | docker login --username AWS --password-stdin {ACCOUNT_ID}.dkr.ecr.{REGION}.amazonaws.com

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded


In [75]:
! pygmentize -l docker Dockerfile.postproc

[34mFROM[39;49;00m[33m python:3.7-slim-buster[39;49;00m

[37m########### Installing packages ##########[39;49;00m
[34mRUN[39;49;00m pip3 install [31mpandas[39;49;00m==[34m0[39;49;00m.25.3 scikit-learn==[34m0[39;49;00m.21.3
[34mENV[39;49;00m[33m PYTHONUNBUFFERED=TRUE[39;49;00m

[37m########### Configure processing scripts ##########[39;49;00m
ARG [31mcode_dir[39;49;00m=/opt/ml/code
[34mRUN[39;49;00m mkdir -p [31m$code_dir[39;49;00m
COPY processing_sources [31m$code_dir[39;49;00m
[34mWORKDIR[39;49;00m[33m $code_dir[39;49;00m

[34mENTRYPOINT[39;49;00m[33m ["python3","processing.py"][39;49;00m


Now, let's build container and push it to ECR

In [None]:
! ./build_and_push.sh $CONTAINER_NAME $TAG Dockerfile.postproc

In [21]:
IMAGE_URI = f"{ACCOUNT_ID}.dkr.ecr.us-east-2.amazonaws.com/{CONTAINER_NAME}:{TAG}"
print("Following container will be used for processing job: ", IMAGE_URI)

Following container will be used for processing job:  553020858742.dkr.ecr.us-east-2.amazonaws.com/custom-processing:latest


## Preparing processing script


In [77]:
! pygmentize processing_sources/processing.py

[34mfrom[39;49;00m [04m[36margparse[39;49;00m [34mimport[39;49;00m ArgumentParser
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m

[34mdef[39;49;00m [32mmain[39;49;00m(args):
    
    tokens = args.tokens.split([33m'[39;49;00m[33m,[39;49;00m[33m'[39;49;00m)
    
    [37m# initiate dict to store token counts results[39;49;00m
    token_counts = {token : [34m0[39;49;00m [34mfor[39;49;00m token [35min[39;49;00m tokens}
    
    files = []
    [34mfor[39;49;00m _,_,filenames [35min[39;49;00m os.walk(args.input_dir):
        files.extend(filenames)
    
    [34mfor[39;49;00m [36mfile[39;49;00m [35min[39;49;00m files:
        
        [34mwith[39;49;00m [36mopen[39;49;00m([36mfile[39;49;00m, [33m'[39;49;00m[33mr[39;49;00m[33m'[39;49;00m) [34mas[39;49;00m [36mfile[39;49;00m:
            file_string = [36mfile[39;49;00m.read().replace([33m'[39;49;00m[33m\n[39;49;00m[33m'[39;49;00m,[33m'

## Run Sagemaker Processing job



In [82]:
bucket = "vadimd-batch-transform"

prefix_input = 'flair-output' 
prefix_output = 'postproc-output' 

input_s3_path = f"s3://{bucket}/{prefix_input}"    # S3 path where results of our inference job are stored
output_s3_path = f"s3://{bucket}/{prefix_output}"  # S3 path where we'll upload aggregated processing results

Define which tokens we'd like to count as part of processing. This dictionary will be passed as command line arguments to our processing script.

In [78]:
tokens_to_count = ["<B-LOC>", "<E-LOC>", "<I-LOC>", "<S-LOC>"]

In [None]:
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput

processor = Processor(image_uri=IMAGE_URI,
                      role=role,
                      instance_count=1,
                      base_job_name="nlp-processing-1",
                      sagemaker_session=sess, 
                      instance_type="ml.m5.xlarge")
                     
processor.run(inputs=[ProcessingInput(source=input_s3_path,
                                      destination='/opt/ml/processing/input_data')],
                                      outputs=[ProcessingOutput(source='/opt/ml/processing/processed_data',
                                      destination=output_s3_path)],
                                      arguments=["--tokens", ','.join(tokens_to_count)])


Job Name:  nlp-processing-1-2020-06-02-00-11-10-089
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://vadimd-batch-transform/flair-output', 'LocalPath': '/opt/ml/processing/input_data', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'S3Output': {'S3Uri': 's3://vadimd-batch-transform/postproc-output', 'LocalPath': '/opt/ml/processing/processed_data', 'S3UploadMode': 'EndOfJob'}}]
.............

Let's download file with results and check that results are as expected:

In [85]:
! aws s3 cp s3://vadimd-batch-transform/postproc-output/token_counts.json token_counts.json
! cat token_counts.json

download: s3://vadimd-batch-transform/postproc-output/token_counts.json to ./token_counts.json
{"<B-LOC>": 4, "<E-LOC>": 4, "<I-LOC>": 1, "<S-LOC>": 4}