## Amazon SageMaker Processing jobs

With Amazon SageMaker Processing jobs, you can leverage a simplified, managed experience to run data pre- or post-processing and model evaluation workloads on the Amazon SageMaker platform.

A processing job downloads input from Amazon Simple Storage Service (Amazon S3), then uploads outputs to Amazon S3 during or after the processing job.

<img src="Processing-1.jpg">

This notebook shows how you can:

1. Run a processing job to run a scikit-learn script that cleans, pre-processes, performs feature engineering, and splits the input data into train and test sets.
2. Run a training job on the pre-processed training data to train a model
3. Run a processing job on the pre-processed test data to evaluate the trained model's performance
4. Use your own custom container to run processing jobs with your own Python libraries and dependencies.

The dataset used here is the [Census-Income KDD Dataset](https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29). You select features from this dataset, clean the data, and turn the data into features that the training algorithm can use to train a binary classification model, and split the data into train and test sets. The task is to predict whether rows representing census responders have an income greater than `$50,000`, or less than `$50,000`. The dataset is heavily class imbalanced, with most records being labeled as earning less than `$50,000`. After training a logistic regression model, you evaluate the model against a hold-out test dataset, and save the classification evaluation metrics, including precision, recall, and F1 score for each label, and accuracy and ROC AUC for the model.

## Data pre-processing and feature engineering

To run the scikit-learn preprocessing script as a processing job, create a `SKLearnProcessor`, which lets you run scripts inside of processing jobs using the scikit-learn image provided.

In [11]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

region = boto3.session.Session().region_name

role = get_execution_role()


Before introducing the script you use for data cleaning, pre-processing, and feature engineering, inspect the first 20 rows of the dataset. The target is predicting the `income` category. The features from the dataset you select are `age`, `education`, `major industry code`, `class of worker`, `num persons worked for employer`, `capital gains`, `capital losses`, and `dividends from stocks`.

In [12]:
!aws s3 ls  s3://demo-saeed/bigdata/ --human-readable

                           PRE spark-preprocess-demo/
2020-03-21 02:51:48  277.2 MiB People_data3.csv
2020-03-21 04:19:40  187.4 KiB abalone.csv
2020-03-21 04:28:55    5.4 GiB abalone_out1.csv
2020-03-21 02:51:48  143.8 MiB creditcard.csv
2020-03-21 02:51:48    1.0 GiB creditcard180_train.csv
2020-03-21 02:51:48  150.4 MiB creditcard30_train.csv
2020-03-21 02:51:48    6.1 GiB creditcard_out1.csv
2020-03-21 02:51:51   53.9 GiB people_out2.csv
2020-03-21 02:51:51    5.4 GiB people_out3.csv


In [13]:
import pandas as pd

input_data = 's3://demo-saeed/bigdata/people_out3.csv'
df = pd.read_csv(input_data, nrows=10)

df.head(n=10)

Unnamed: 0,id,Email Id,Prefix,Name,Birth Date,Phone Number,Additional Email Id,Address,Zip Code,City,State,Country,Year,Time,Link,Text
0,0,Brent.Pace@testDomain.com,Mrs.,Holly Huynh,01-03-1975,(029)2018188,morganneil@rodriguez-armstrong.biz,"0723 Murphy Freeway\nMarshallshire, MA 36129",97201,Morenofort,Idaho,United States of America,2011,13:21:30,https://green.com/,research
1,1,Susan.Rice@testDomain.com,Dr.,Renee Gallagher,11-08-1970,(0161)4960161,webernathan@rich.net,"31532 Rachel Well\nMartinezfurt, MN 86908",87511,East Williamfort,Massachusetts,Fiji,1973,09:27:29,https://www.rodriguez.net/,stay
2,2,Anthony.Fields@testDomain.com,Dr.,Anne Gilbert,17-02-1983,01134960377,ktaylor@joseph-russell.com,Unit 0528 Box 9792\nDPO AE 88109,86368,North Joshua,Utah,Egypt,1986,19:50:55,http://chang-jackson.biz/,ok
3,3,Christine.Bennett@testDomain.com,Mrs.,Jessica Hodge,28-10-1998,(0306)9990066,jessicahines@prince.org,"15066 Karen Cliffs Apt. 326\nLake Matthew, WV ...",28456,Meyerfort,Alabama,Jordan,1974,19:56:01,http://www.garcia-ross.com/,mind
4,4,Johnny.Tran@testDomain.com,Ms.,Tracy Weaver,14-04-1992,(0114) 4960615,michelle56@porter-turner.com,"599 Conway Vista\nNorth Kevinside, OH 78256",16349,New John,Tennessee,Liechtenstein,2009,15:44:53,https://www.weeks.com/,raise
5,5,Amanda.Williamson@testDomain.com,Mr.,Tiffany Jones,23-03-1984,+44161 4960183,jacqueline89@yahoo.com,"013 Dudley Meadow\nHooverfurt, NH 13359",73296,Port Matthew,Nevada,Holy See (Vatican City State),2018,16:07:03,http://hensley.com/,space
6,6,Madison.Ray@testDomain.com,Dr.,Catherine Brown,29-09-1976,+44(0)141 496 0278,jamestucker@hotmail.com,"PSC 7479, Box 1107\nAPO AP 13014",80892,New Andrealand,Virginia,Iraq,2018,05:53:49,http://www.brooks.net/,back
7,7,Rachel.Kelly@testDomain.com,Mr.,Victoria Trevino,02-08-1999,+44113 496 0189,tevans@hotmail.com,"1923 Lisa Corner Apt. 778\nWest Amandatown, AL...",13880,North Christianville,North Carolina,Bahrain,1994,16:44:26,https://freeman.net/,another
8,8,Ronald.Malone@testDomain.com,Mrs.,Cory Jarvis,17-07-1999,+44(0)8081570874,chenphilip@orr.com,"0583 William Drive Suite 406\nNew Sherry, AZ 4...",74201,South Ginaton,North Dakota,Niger,2003,21:11:56,https://www.smith.com/,fund
9,9,Jasmine.Peterson@testDomain.com,Mrs.,Cassandra Collins,26-01-1999,+441914960255,uramirez@carpenter.com,"80616 William Skyway Suite 099\nNormantown, IA...",36308,Joshuafort,Colorado,Mexico,2016,22:59:06,http://williams.com/,my


This notebook cell writes a file `preprocessing.py`, which contains the pre-processing script. You can update the script, and rerun this cell to overwrite `preprocessing.py`. You run this as a processing job in the next cell. In this script, you

* Remove duplicates and rows with conflicting data
* transform the target `income` column into a column containing two labels.
* transform the `age` and `num persons worked for employer` numerical columns into categorical features by binning them
* scale the continuous `capital gains`, `capital losses`, and `dividends from stocks` so they're suitable for training
* encode the `education`, `major industry code`, `class of worker` so they're suitable for training
* split the data into training and test datasets, and saves the training features and labels and test features and labels.

Our training script will use the pre-processed training features and labels to train a model, and our model evaluation script will use the trained model and pre-processed test features and labels to evaluate the model.

Run this script as a processing job. Use the `SKLearnProcessor.run()` method. You give the `run()` method one `ProcessingInput` where the `source` is the census dataset in Amazon S3, and the `destination` is where the script reads this data from, in this case `/opt/ml/processing/input`. These local paths inside the processing container must begin with `/opt/ml/processing/`.

Also give the `run()` method a `ProcessingOutput`, where the `source` is the path the script writes output data to. For outputs, the `destination` defaults to an S3 bucket that the Amazon SageMaker Python SDK creates for you, following the format `s3://sagemaker-<region>-<account_id>/<processing_job_name>/output/<output_name/`. You also give the ProcessingOutputs values for `output_name`, to make it easier to retrieve these output artifacts after the job is run.

The `arguments` parameter in the `run()` method are command-line arguments in our `preprocessing.py` script.

In [23]:
from datetime import datetime
start = datetime.now()
sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                     role=role,
                                     instance_type='ml.p3.2xlarge',
                                     volume_size_in_gb =100,
                                     instance_count=1)

from sagemaker.processing import ProcessingInput, ProcessingOutput


In [24]:

sklearn_processor.run(code='preprocessing.py',
                      inputs=[ProcessingInput(
                        source=input_data,
                          
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='train_data',
                                                source='/opt/ml/processing/train')],
                      arguments=['--train-test-split-ratio', '0.2'],
                      wait = False
                     )




Job Name:  sagemaker-scikit-learn-2020-03-24-14-31-27-095
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://demo-saeed/bigdata/people_out3.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-079329190341/sagemaker-scikit-learn-2020-03-24-14-31-27-095/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train_data', 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-079329190341/sagemaker-scikit-learn-2020-03-24-14-31-27-095/output/train_data', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}]


ResourceLimitExceeded: An error occurred (ResourceLimitExceeded) when calling the CreateProcessingJob operation: The account-level service limit 'ml.p3.2xlarge for processing job usage' is 4 Instances, with current utilization of 4 Instances and a request delta of 1 Instances. Please contact AWS support to request an increase for this limit.

In [136]:
outputpath = processingJob['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
outputpath
!aws s3 ls {outputpath}/ --human-readable

'sagemaker-scikit-learn-2020-03-24-04-23-01-746'

In [1]:
for i in range(7):
    print(sklearn_processor.jobs[i].describe()['ProcessingJobStatus'], ' ', sklearn_processor.jobs[i].describe()['ProcessingJobName'])

NameError: name 'sklearn_processor' is not defined

In [7]:
import time
import boto3
import pytz
from datetime import datetime
max_time = time.time() + 3*60*60 # 3 hours

client = boto3.client('sagemaker')
while time.time() < max_time:
    processingJob = client.describe_processing_job(ProcessingJobName='sagemaker-scikit-learn-2020-03-24-05-32-57-278')
    #processingJob = sklearn_processor.jobs[5].describe() 
    now = datetime.now(pytz.utc)
    elapsed = now - processingJob["CreationTime"]
    #print(processingJob["CreationTime"])
    #print("creation time: {}".format(processingJob["CreationTime"]))
    print("Job status: {}   (elapsed = {})".format(processingJob['ProcessingJobStatus'], elapsed))
    if processingJob['ProcessingJobStatus'] == "Completed":
        print("ProcessingEndTime: {}".format(processingJob["ProcessingEndTime"]))
        finalduraiton = processingJob["ProcessingEndTime"] - processingJob["ProcessingStartTime"]
        print("Final duration: {} ".format( finalduraiton))
        break
    if processingJob['ProcessingJobStatus'] in ["Failed","Stopped"]:
        break     
        
    time.sleep(60)

Job status: Completed   (elapsed = 7:20:27.773731)
ProcessingEndTime: 2020-03-24 06:16:57+00:00
Final duration: 0:40:03 


Now inspect the output of the pre-processing job, which consists of the processed features.

In [36]:
training_features = pd.read_csv(preprocessed_training_data + '/train_features.csv', nrows=10)
print('Training features shape: {}'.format(training_features.shape))
training_features.head(n=10)

Training features shape: (10, 16)


Unnamed: 0,0,Brent.Pace@testDomain.com,Mrs.,Holly Huynh,01-03-1975,(029)2018188,morganneil@rodriguez-armstrong.biz,"0723 Murphy Freeway Marshallshire, MA 36129",97201,Morenofort,Idaho,United States of America,2011,13:21:30,https://green.com/,research
0,1,Susan.Rice@testDomain.com,Dr.,Renee Gallagher,11-08-1970,(0161)4960161,webernathan@rich.net,"31532 Rachel Well\nMartinezfurt, MN 86908",87511,East Williamfort,Massachusetts,Fiji,1973,09:27:29,https://www.rodriguez.net/,stay
1,2,Anthony.Fields@testDomain.com,Dr.,Anne Gilbert,17-02-1983,01134960377,ktaylor@joseph-russell.com,Unit 0528 Box 9792\nDPO AE 88109,86368,North Joshua,Utah,Egypt,1986,19:50:55,http://chang-jackson.biz/,ok
2,3,Christine.Bennett@testDomain.com,Mrs.,Jessica Hodge,28-10-1998,(0306)9990066,jessicahines@prince.org,"15066 Karen Cliffs Apt. 326\nLake Matthew, WV ...",28456,Meyerfort,Alabama,Jordan,1974,19:56:01,http://www.garcia-ross.com/,mind
3,4,Johnny.Tran@testDomain.com,Ms.,Tracy Weaver,14-04-1992,(0114) 4960615,michelle56@porter-turner.com,"599 Conway Vista\nNorth Kevinside, OH 78256",16349,New John,Tennessee,Liechtenstein,2009,15:44:53,https://www.weeks.com/,raise
4,5,Amanda.Williamson@testDomain.com,Mr.,Tiffany Jones,23-03-1984,+44161 4960183,jacqueline89@yahoo.com,"013 Dudley Meadow\nHooverfurt, NH 13359",73296,Port Matthew,Nevada,Holy See (Vatican City State),2018,16:07:03,http://hensley.com/,space
5,6,Madison.Ray@testDomain.com,Dr.,Catherine Brown,29-09-1976,+44(0)141 496 0278,jamestucker@hotmail.com,"PSC 7479, Box 1107\nAPO AP 13014",80892,New Andrealand,Virginia,Iraq,2018,05:53:49,http://www.brooks.net/,back
6,7,Rachel.Kelly@testDomain.com,Mr.,Victoria Trevino,02-08-1999,+44113 496 0189,tevans@hotmail.com,"1923 Lisa Corner Apt. 778\nWest Amandatown, AL...",13880,North Christianville,North Carolina,Bahrain,1994,16:44:26,https://freeman.net/,another
7,8,Ronald.Malone@testDomain.com,Mrs.,Cory Jarvis,17-07-1999,+44(0)8081570874,chenphilip@orr.com,"0583 William Drive Suite 406\nNew Sherry, AZ 4...",74201,South Ginaton,North Dakota,Niger,2003,21:11:56,https://www.smith.com/,fund
8,9,Jasmine.Peterson@testDomain.com,Mrs.,Cassandra Collins,26-01-1999,+441914960255,uramirez@carpenter.com,"80616 William Skyway Suite 099\nNormantown, IA...",36308,Joshuafort,Colorado,Mexico,2016,22:59:06,http://williams.com/,my
9,10,Brandon.Lopez@testDomain.com,Mr.,Tyler Taylor,07-02-1987,(0909) 8790183,hillmisty@marquez.com,USNS Collins\nFPO AA 71623,33440,West Michael,Ohio,Peru,1974,04:54:29,https://www.phillips-barnes.com/,itself


## Training using the pre-processed data

We create a `SKLearn` instance, which we will use to run a training job using the training script `train.py`.  

In [None]:
from sagemaker.sklearn.estimator import SKLearn

sklearn = SKLearn(
    entry_point='train.py',
    train_instance_type="ml.m5.xlarge",
    role=role)

The training script `train.py` trains a logistic regression model on the training data, and saves the model to the `/opt/ml/model` directory, which Amazon SageMaker tars and uploads into a `model.tar.gz` file into S3 at the end of the training job.

In [None]:
%%writefile train.py

import os

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib

if __name__=="__main__":
    training_data_directory = '/opt/ml/input/data/train'
    train_features_data = os.path.join(training_data_directory, 'train_features.csv')
    train_labels_data = os.path.join(training_data_directory, 'train_labels.csv')
    print('Reading input data')
    X_train = pd.read_csv(train_features_data, header=None)
    y_train = pd.read_csv(train_labels_data, header=None)

    model = LogisticRegression(class_weight='balanced', solver='lbfgs')
    print('Training LR model')
    model.fit(X_train, y_train)
    model_output_directory = os.path.join('/opt/ml/model', "model.joblib")
    print('Saving model to {}'.format(model_output_directory))
    joblib.dump(model, model_output_directory)

Run the training job using `train.py` on the preprocessed training data.

In [None]:
sklearn.fit({'train': preprocessed_training_data})
training_job_description = sklearn.jobs[-1].describe()
model_data_s3_uri = '{}{}/{}'.format(
    training_job_description['OutputDataConfig']['S3OutputPath'],
    training_job_description['TrainingJobName'],
    'output/model.tar.gz')

## Model Evaluation

`evaluation.py` is the model evaluation script. Since the script also runs using scikit-learn as a dependency,  run this using the `SKLearnProcessor` you created previously. This script takes the trained model and the test dataset as input, and produces a JSON file containing classification evaluation metrics, including precision, recall, and F1 score for each label, and accuracy and ROC AUC for the model.


In [None]:
%%writefile evaluation.py

import json
import os
import tarfile

import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

if __name__=="__main__":
    model_path = os.path.join('/opt/ml/processing/model', 'model.tar.gz')
    print('Extracting model from path: {}'.format(model_path))
    with tarfile.open(model_path) as tar:
        tar.extractall(path='.')
    print('Loading model')
    model = joblib.load('model.joblib')

    print('Loading test input data')
    test_features_data = os.path.join('/opt/ml/processing/test', 'test_features.csv')
    test_labels_data = os.path.join('/opt/ml/processing/test', 'test_labels.csv')

    X_test = pd.read_csv(test_features_data, header=None)
    y_test = pd.read_csv(test_labels_data, header=None)
    predictions = model.predict(X_test)

    print('Creating classification evaluation report')
    report_dict = classification_report(y_test, predictions, output_dict=True)
    report_dict['accuracy'] = accuracy_score(y_test, predictions)
    report_dict['roc_auc'] = roc_auc_score(y_test, predictions)

    print('Classification report:\n{}'.format(report_dict))

    evaluation_output_path = os.path.join('/opt/ml/processing/evaluation', 'evaluation.json')
    print('Saving classification report to {}'.format(evaluation_output_path))

    with open(evaluation_output_path, 'w') as f:
        f.write(json.dumps(report_dict))

In [None]:
import json
from sagemaker.s3 import S3Downloader

sklearn_processor.run(code='evaluation.py',
                      inputs=[ProcessingInput(
                                  source=model_data_s3_uri,
                                  destination='/opt/ml/processing/model'),
                              ProcessingInput(
                                  source=preprocessed_test_data,
                                  destination='/opt/ml/processing/test')],
                      outputs=[ProcessingOutput(output_name='evaluation',
                                  source='/opt/ml/processing/evaluation')]
                     )                    
evaluation_job_description = sklearn_processor.jobs[-1].describe()

Now retrieve the file `evaluation.json` from Amazon S3, which contains the evaluation report.

In [None]:
evaluation_output_config = evaluation_job_description['ProcessingOutputConfig']
for output in evaluation_output_config['Outputs']:
    if output['OutputName'] == 'evaluation':
        evaluation_s3_uri = output['S3Output']['S3Uri'] + '/evaluation.json'
        break

evaluation_output = S3Downloader.read_file(evaluation_s3_uri)
evaluation_output_dict = json.loads(evaluation_output)
print(json.dumps(evaluation_output_dict, sort_keys=True, indent=4))

## Running processing jobs with your own dependencies

Above, you used a processing container that has scikit-learn installed, but you can run your own processing container in your processing job as well, and still provide a script to run within your processing container.

Below, you walk through how to create a processing container, and how to use a `ScriptProcessor` to run your own code within a container. Create a scikit-learn container and run a processing job using the same `preprocessing.py` script you used above. You can provide your own dependencies inside this container to run your processing script with.

In [None]:
!mkdir docker

This is the Dockerfile to create the processing container. Install `pandas` and `scikit-learn` into it. You can install your own dependencies.

In [None]:
%%writefile docker/Dockerfile

FROM python:3.7-slim-buster

RUN pip3 install pandas==0.25.3 scikit-learn==0.21.3
ENV PYTHONUNBUFFERED=TRUE

ENTRYPOINT ["python3"]

This block of code builds the container using the `docker` command, creates an Amazon Elastic Container Registry (Amazon ECR) repository, and pushes the image to Amazon ECR.

In [None]:
import boto3

account_id = boto3.client('sts').get_caller_identity().get('Account')
ecr_repository = 'sagemaker-processing-container'
tag = ':latest'
processing_repository_uri = '{}.dkr.ecr.{}.amazonaws.com/{}'.format(account_id, region, ecr_repository + tag)

# Create ECR repository and push docker image
!docker build -t $ecr_repository docker
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

The `ScriptProcessor` class lets you run a command inside this container, which you can use to run your own script.

In [None]:
from sagemaker.processing import ScriptProcessor

script_processor = ScriptProcessor(command=['python3'],
                image_uri=processing_repository_uri,
                role=role,
                instance_count=1,
                instance_type='ml.m5.xlarge')

Run the same `preprocessing.py` script you ran above, but now, this code is running inside of the Docker container you built in this notebook, not the scikit-learn image maintained by Amazon SageMaker. You can add the dependencies to the Docker image, and run your own pre-processing, feature-engineering, and model evaluation scripts inside of this container.

In [None]:
script_processor.run(code='preprocessing.py',
                      inputs=[ProcessingInput(
                        source=input_data,
                        destination='/opt/ml/processing/input')],
                      outputs=[ProcessingOutput(output_name='train_data',
                                                source='/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test_data',
                                                source='/opt/ml/processing/test')],
                      arguments=['--train-test-split-ratio', '0.2']
                     )
script_processor_job_description = script_processor.jobs[-1].describe()
print(script_processor_job_description)