# UDACITY SageMaker Essentials: Processing Job Exercise

In prior exercises, we've been running and rerunning the same preprocessing job over and over again. For cleanly formatted data, it's possible to do some preprocessing utilizing batch transform. However, if slightly more sophisticated processing is needed, we would want to do so through a processing job. Finally, after beating around the bush for a few exercises, we're finally going offload the preprocessing step of our data!

In [1]:
!pip install boto
#!pip install --upgrade pip

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [2]:
import sklearn
import boto3
import os


## Preprocessing (for the final time!)

The cell below should be very familiar to you by now. This cell will write the preprocessing code to a file called "HelloBlazePreprocess.py". This code will be utilized by AWS via a SciKitLearn processing interface to process our data. 

In [3]:
%%writefile HelloBlazePreprocess.py

import json
import zipfile

# Function below unzips the archive to the local directory. 

def unzip_data(input_data_path):
    with zipfile.ZipFile(input_data_path, 'r') as input_data_zip:
        input_data_zip.extractall('.')
        return input_data_zip.namelist()[0]

# Input data is a file with a single JSON object per line with the following format: 
# {
#  "reviewerID": <string>,
#  "asin": <string>,
#  "reviewerName" <string>,
#  "helpful": [
#    <int>, (indicating number of "helpful votes")
#    <int>  (indicating total number of votes)
#  ],
#  "reviewText": "<string>",
#  "overall": <int>,
#  "summary": "<string>",
#  "unixReviewTime": <int>,
#  "reviewTime": "<string>"
# }
# 
# We are specifically interested in the fields "helpful" and "reviewText"
#

def label_data(input_data):
    labeled_data = []
    HELPFUL_LABEL = "__label__1"
    UNHELPFUL_LABEL = "__label__2"
     
    for l in open(input_data, 'r'):
        l_object = json.loads(l)
        helpful_votes = float(l_object['helpful'][0])
        total_votes = l_object['helpful'][1]
        reviewText = l_object['reviewText']
        if total_votes != 0:
            if helpful_votes / total_votes > .5:
                labeled_data.append(" ".join([HELPFUL_LABEL, reviewText]))
            elif helpful_votes / total_votes < .5:
                labeled_data.append(" ".join([UNHELPFUL_LABEL, reviewText]))
          
    return labeled_data


# Labeled data is a list of sentences, starting with the label defined in label_data. 

def split_sentences(labeled_data):
    new_split_sentences = []
    for d in labeled_data:
        label = d.split()[0]        
        sentences = " ".join(d.split()[1:]).split(".") # Initially split to separate label, then separate sentences
        for s in sentences:
            if s: # Make sure sentences isn't empty. Common w/ "..."
                new_split_sentences.append(" ".join([label, s]))
    return new_split_sentences

def write_data(data, train_path, test_path, proportion):
    border_index = int(proportion * len(data))
    train_f = open(train_path, 'w')
    test_f = open(test_path, 'w')
    index = 0
    for d in data:
        if index < border_index:
            train_f.write(d + '\n')
        else:
            test_f.write(d + '\n')
        index += 1

if __name__ == "__main__":
    unzipped_path = unzip_data('/opt/ml/processing/input/Toys_and_Games_5.json.zip')
    labeled_data = label_data(unzipped_path)
    new_split_sentence_data = split_sentences(labeled_data)
    write_data(new_split_sentence_data, '/opt/ml/processing/output/train/hello_blaze_train_scikit', '/opt/ml/processing/output/test/hello_blaze_test_scikit', .9)

Overwriting HelloBlazePreprocess.py


## Exercise: Upload unprocessed data to s3

No more local preprocessing! Upload the **raw zipped** Toys_and_Games dataset to s3. 

In [4]:
# Todo
s3_bucket = "mlflowdemo-kb"
s3_prefix = "l2e4"
file_name = "Toys_and_Games_5.json.zip"

def upload_file_to_s3(file_name, s3_prefix):
    object_name = os.path.join(s3_prefix, file_name)
    s3_client = boto3.client('s3')
    try:
        response = s3_client.upload_file(file_name, s3_bucket, object_name)
    except ClientError as e:
        logging.error(e)
        return False

upload_file_to_s3(file_name, s3_prefix)
source_path = "/".join([s3_bucket, s3_prefix, file_name])
print(source_path)

mlflowdemo-kb/l2e4/Toys_and_Games_5.json.zip


## Exercise: Launch a processing job through the SciKitLearn interface. 

We'll be utilizing the SKLearnProcessor object from SageMaker to launch a processing job. Here is some information you'll need to complete the exercise: 

* You will need to use the SKLearnProcessor object. 
* You will need 1 instance of ml.m5.large. You can specify this programatically. 
* You will need an execution role.  

* You will need to call the "run" method on the SKLearnProcessor object.
> * You will need to specify the preprocessing code
> * the S3 path of the unprocessed data
> * a 'local' directory path for the input to be downloaded into
> * 'local' directory paths for where you expect the output to be.

you will need to make use of the ProcessingInput and ProcessingOutput features. Review the preprocessing code for where the output is going to go, and where it expects the input to come from.  
* It is highly recommended that you consult the documentation to help you implement this. https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html

Remember that, conceptually, you are creating a server that our code will be run from. This server will download data, execute code that we specify, and upload data to s3. 

If done successfully, you should see a processing job launch in the SageMaker console. To see it, go to the "processing" drop-down menu on the left-hand side and select "processing jobs." Wait until the job is finished. 

In [5]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

# Get role

role = get_execution_role()

# Create an SKLearnProcessor. Set framework_version='0.20.0'.

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                    role=role,
                                    instance_type='ml.m5.large',
                                    instance_count=1)

# Start a run job. You will pass in as parameters the local location of the processing code, 
# a processing input object, two processing output objects. The paths that you pass in here are directories, 
# not the files themselves. Check the preprocessing code for a hint about what these directories should be. 

sklearn_processor.run(code='HelloBlazePreprocess.py', # preprocessing code
                      inputs=[ProcessingInput(
                          source ='s3://mlflowdemo-kb/l2e4/Toys_and_Games_5.json.zip', # the S3 path of the unprocessed data
                          destination='/opt/ml/processing/input/data/', # a 'local' directory path for the input to be downloaded into
                      )],
                      outputs=[ProcessingOutput(source='/opt/ml/processing/output/train' ),# a 'local' directory path for where you expect the output for train data to be
                               ProcessingOutput(source='/opt/ml/processing/output/test' )]) # a 'local' directory path for where you expect the output for test data to be 

UnknownServiceError: Unknown service: 'sagemaker-metrics'. Valid service names are: accessanalyzer, account, acm, acm-pca, alexaforbusiness, amp, amplify, amplifybackend, amplifyuibuilder, apigateway, apigatewaymanagementapi, apigatewayv2, appconfig, appconfigdata, appflow, appintegrations, application-autoscaling, application-insights, applicationcostprofiler, appmesh, apprunner, appstream, appsync, athena, auditmanager, autoscaling, autoscaling-plans, backup, backup-gateway, batch, braket, budgets, ce, chime, chime-sdk-identity, chime-sdk-meetings, chime-sdk-messaging, cloud9, cloudcontrol, clouddirectory, cloudformation, cloudfront, cloudhsm, cloudhsmv2, cloudsearch, cloudsearchdomain, cloudtrail, cloudwatch, codeartifact, codebuild, codecommit, codedeploy, codeguru-reviewer, codeguruprofiler, codepipeline, codestar, codestar-connections, codestar-notifications, cognito-identity, cognito-idp, cognito-sync, comprehend, comprehendmedical, compute-optimizer, config, connect, connect-contact-lens, connectparticipant, cur, customer-profiles, databrew, dataexchange, datapipeline, datasync, dax, detective, devicefarm, devops-guru, directconnect, discovery, dlm, dms, docdb, drs, ds, dynamodb, dynamodbstreams, ebs, ec2, ec2-instance-connect, ecr, ecr-public, ecs, efs, eks, elastic-inference, elasticache, elasticbeanstalk, elastictranscoder, elb, elbv2, emr, emr-containers, es, events, evidently, finspace, finspace-data, firehose, fis, fms, forecast, forecastquery, frauddetector, fsx, gamelift, glacier, globalaccelerator, glue, grafana, greengrass, greengrassv2, groundstation, guardduty, health, healthlake, honeycode, iam, identitystore, imagebuilder, importexport, inspector, inspector2, iot, iot-data, iot-jobs-data, iot1click-devices, iot1click-projects, iotanalytics, iotdeviceadvisor, iotevents, iotevents-data, iotfleethub, iotsecuretunneling, iotsitewise, iotthingsgraph, iottwinmaker, iotwireless, ivs, kafka, kafkaconnect, kendra, keyspaces, kinesis, kinesis-video-archived-media, kinesis-video-media, kinesis-video-signaling, kinesisanalytics, kinesisanalyticsv2, kinesisvideo, kms, lakeformation, lambda, lex-models, lex-runtime, lexv2-models, lexv2-runtime, license-manager, lightsail, location, logs, lookoutequipment, lookoutmetrics, lookoutvision, machinelearning, macie, macie2, managedblockchain, marketplace-catalog, marketplace-entitlement, marketplacecommerceanalytics, mediaconnect, mediaconvert, medialive, mediapackage, mediapackage-vod, mediastore, mediastore-data, mediatailor, memorydb, meteringmarketplace, mgh, mgn, migration-hub-refactor-spaces, migrationhub-config, migrationhubstrategy, mobile, mq, mturk, mwaa, neptune, network-firewall, networkmanager, nimble, opensearch, opsworks, opsworkscm, organizations, outposts, panorama, personalize, personalize-events, personalize-runtime, pi, pinpoint, pinpoint-email, pinpoint-sms-voice, polly, pricing, proton, qldb, qldb-session, quicksight, ram, rbin, rds, rds-data, redshift, redshift-data, rekognition, resiliencehub, resource-groups, resourcegroupstaggingapi, robomaker, route53, route53-recovery-cluster, route53-recovery-control-config, route53-recovery-readiness, route53domains, route53resolver, rum, s3, s3control, s3outposts, sagemaker, sagemaker-a2i-runtime, sagemaker-edge, sagemaker-featurestore-runtime, sagemaker-runtime, savingsplans, schemas, sdb, secretsmanager, securityhub, serverlessrepo, service-quotas, servicecatalog, servicecatalog-appregistry, servicediscovery, ses, sesv2, shield, signer, sms, sms-voice, snow-device-management, snowball, sns, sqs, ssm, ssm-contacts, ssm-incidents, sso, sso-admin, sso-oidc, stepfunctions, storagegateway, sts, support, swf, synthetics, textract, timestream-query, timestream-write, transcribe, transfer, translate, voice-id, waf, waf-regional, wafv2, wellarchitected, wisdom, workdocs, worklink, workmail, workmailmessageflow, workspaces, workspaces-web, xray

## Exercise: Sanity Check & Reflection. 

If all goes well, processed data should have been uploaded to S3. If you're having trouble locating the uri, check the `jobs` attribute of the SKLearnProcessor object. 

In [None]:
sklearn_processor.jobs[-1].describe()

Download these datasets and compare them to the datasets that we locally processed. The exact sentences in the training & the test sets may vary depending on your implementation, but the same number of sentences should be present in each job, and there should be one label and one sentence per line.  


Once you've confirmed that the data was accurately processed, take a step back and reflect on what you've done. You've created the individual components necessary to process data, train data, and perform inference on both individual instances of data and large datasets. What are we missing if we wanted to provide a live, continuous service? Keep this question in mind as we move on to designing workflows. 