# Data Wrangler Processing Job for Customers Dataset

<div class="alert alert-info"> 💡 <strong> Quick Start </strong>
To save your processed data to S3, select the Run menu above and click <strong>Run all cells</strong>. 
<strong><a style="color: #0397a7 " href="#Job-Status-&-S3-Output-Location">
    <u>View the status of the export job and the output S3 location</u></a>.
</strong>
</div>


This notebook executes your Data Wrangler Flow `customers.flow` on the entire dataset using a SageMaker 
Processing Job and will save the processed data to S3.

This notebook saves data from the step `Cast Single Data Type` from `Source: customers.Csv`. To save from a different step, go to Data Wrangler 
to select a new step to export. 

---

## Contents

1. [Inputs and Outputs](#Inputs-and-Outputs)
1. [Run Processing Job](#Run-Processing-Job)
   1. [Job Configurations](#Job-Configurations)
   1. [Create Processing Job](#Create-Processing-Job)
   1. [Job Status & S3 Output Location](#Job-Status-&-S3-Output-Location)
1. [Optional Next Steps](#(Optional)Next-Steps)
    1. [Load Processed Data into Pandas](#(Optional)-Load-Processed-Data-into-Pandas)
    1. [Train a model with SageMaker](#(Optional)Train-a-model-with-SageMaker)
---

### Loading stored variables
If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook! 

In [4]:
import boto3
import sagemaker
region = sagemaker.Session().boto_region_name
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)
s3_client = boto3.client("s3", region_name=region)
sagemaker_client = boto_session.client("sagemaker")
sess = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_client
)

In [2]:
%store -r
%store

Stored variables and their in-db values:
bucket                             -> 'sagemaker-us-east-1-875692608981'
claims_fg_name                     -> 'fraud-detect-demo-claims'
claims_preprocessed                ->       policy_id  incident_severity  num_vehicles_i
claims_table                       -> 'fraud-detect-demo-claims-1636518800'
clarify_expl_job_name              -> 'Clarify-Explainability-2021-11-10-14-35-21-747'
col_order                          -> ['fraud', 'num_injuries', 'incident_severity', 'in
customers_fg_name                  -> 'fraud-detect-demo-customers'
customers_preprocessed             ->       policy_id  customer_age  customer_education 
customers_table                    -> 'fraud-detect-demo-customers-1636518803'
database_name                      -> 'sagemaker_featurestore'
endpoint_config_name               -> 'fraud-detect-demo-xgboost-post-smote-endpoint-con
endpoint_name                      -> 'fraud-detect-demo-2021-11-11-08-19-15'
hyperparameter

In [5]:
# ======> Tons of output_paths
processing_dir = "/opt/ml/processing"

### Upload raw data to S3
Before you can preprocess the raw data with Data Wrangler, it must exist in S3.

In [6]:
s3_client.upload_file(
    Filename="data/customers.csv", Bucket=bucket, Key=f"{prefix}/data/raw/customers.csv"
)

# Inputs and Outputs

The below settings configure the inputs and outputs for the flow export.

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

In <b>Input - Source</b> you can configure the data sources that will be used as input by Data Wrangler

1. For S3 sources, configure the source attribute that points to the input S3 prefixes
2. For all other sources, configure attributes like query_string, database in the source's 
<b>DatasetDefinition</b> object.

If you modify the inputs the provided data must have the same schema and format as the data used in the Flow. 
You should also re-execute the cells in this section if you have modified the settings in any data sources.
</div>

In [7]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition

data_sources = []

## Input - S3 Source: customers.csv

In [52]:
data_sources.append(ProcessingInput(
    source=f"s3://{bucket}/{prefix}/data/raw/customers.csv", # You can override this to point to other dataset on S3
    destination=f"{processing_dir}/customers.csv",
    input_name="customers.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Output: S3 settings

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

1. <b>bucket</b>: you can configure the S3 bucket where Data Wrangler will save the output. The default bucket from 
the SageMaker notebook session is used. 
2. <b>flow_export_id</b>: A randomly generated export id. The export id must be unique to ensure the results do not 
conflict with other flow exports 
3. <b>s3_ouput_prefix</b>:  you can configure the directory name in your bucket where your data will be saved.
</div>

In [16]:
import time
import uuid

# unique flow export ID
flow_export_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_export_name = f"flow-{flow_export_id}"
print(f"Flow export name: {flow_export_name}")

Flow export name: flow-11-12-14-06-75409070


Below are the inputs required by the SageMaker Python SDK to launch a processing job.

In [22]:
import json
# name of the flow file which should exist in the current notebook working directory
flow_file_name = "flows/customers.flow"

# Load .flow file from current notebook working directory 
!echo "Loading flow file from current notebook working directory: $PWD"

with open(flow_file_name) as f:
    flow = json.load(f)

# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = (f"{flow['nodes'][-1]['node_id']}.{flow['nodes'][-1]['outputs'][0]['name']}")
print(f"Output name: {output_name}")

s3_output_prefix = f"export-{flow_export_name}/output"
s3_output_path = f"s3://{bucket}/{prefix}/flow/{s3_output_prefix}"
print(f"Flow S3 export result path: {s3_output_path}")

processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_path,
    s3_upload_mode="EndOfJob"
)

Loading flow file from current notebook working directory: /root/mlt
Output name: dc0ba3db-3a12-49ef-8b39-a7b7867e295b.default
Flow S3 export result path: s3://sagemaker-us-east-1-875692608981/fraud-detect-demo/flow/export-flow-11-12-14-06-75409070/output


## Upload Flow to S3

To use the Data Wrangler as an input to the processing job,  first upload your flow file to Amazon S3.

In [23]:
# Upload flow to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"{prefix}/data_wrangler_flows/{flow_export_name}.flow", ExtraArgs={"ServerSideEncryption": "aws:kms"})

flow_s3_uri = f"s3://{bucket}/{prefix}/data_wrangler_flows/{flow_export_name}.flow"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

Data Wrangler flow flows/customers.flow uploaded to s3://sagemaker-us-east-1-875692608981/fraud-detect-demo/data_wrangler_flows/flow-11-12-14-06-75409070.flow


The Data Wrangler Flow is also provided to the Processing Job as an input source which we configure below.

In [24]:
## Input - Flow: claims.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination=f"{processing_dir}/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

# Run Processing Job 
## Job Configurations

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

You can configure the following settings for Processing Jobs. If you change any configurations you will 
need to re-execute this and all cells below it by selecting the Run menu above and click 
<b>Run Selected Cells and All Below</b>

1. IAM role for executing the processing job. 
2. A unique name of the processing job. Give a unique name every time you re-execute processing jobs
3. Data Wrangler Container URL.
4. Instance count, instance type and storage volume size in GB.
5. Content type for each output. Data Wrangler supports CSV as default and Parquet.
6. Network Isolation settings
7. KMS key to encrypt output data
</div>

In [25]:
from sagemaker import image_uris
# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# Unique processing job name. Give a unique name every time you re-execute processing jobs
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

# Data Wrangler Container URL.
container_uri = image_uris.retrieve(framework='data-wrangler',region=region)
print(f"Container uri: {container_uri}")
# Pinned Data Wrangler Container URL. 
# container_uri_pinned = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.11.2"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# Output configuration used as processing job container arguments 
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

# KMS key for per object encryption; default is None
kms_key = None

Container uri: 663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x


## Create Processing Job

To launch a Processing Job, you will use the SageMaker Python SDK to create a Processor function.

In [26]:
from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess,
    output_kms_key=kms_key
)

# Start Job
processor.run(
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    arguments=[f"--output-config '{json.dumps(output_config)}'"],
    wait=False,
    logs=False,
    job_name=processing_job_name
)


Job Name:  data-wrangler-flow-processing-11-12-14-06-75409070
Inputs:  [{'InputName': 'flow', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-875692608981/fraud-detect-demo/data_wrangler_flows/flow-11-12-14-06-75409070.flow', 'LocalPath': '/opt/ml/processing/flow', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'dc0ba3db-3a12-49ef-8b39-a7b7867e295b.default', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-875692608981/fraud-detect-demo/flow/export-flow-11-12-14-06-75409070/output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]


## Job Status & S3 Output Location

Below you wait for processing job to finish. If it finishes successfully, the raw parameters used by the 
Processing Job will be printed

In [27]:
s3_job_results_path = f"s3://{bucket}/{prefix}/flow/{s3_output_prefix}/{processing_job_name}"
print(f"Job results are saved to S3 path: {s3_job_results_path}")

job_result = sess.wait_for_processing_job(processing_job_name)
job_result

Job results are saved to S3 path: s3://sagemaker-us-east-1-875692608981/export-flow-11-12-14-06-75409070/output/data-wrangler-flow-processing-11-12-14-06-75409070
.......................................................................!

{'ProcessingInputs': [{'InputName': 'flow',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-875692608981/fraud-detect-demo/data_wrangler_flows/flow-11-12-14-06-75409070.flow',
    'LocalPath': '/opt/ml/processing/flow',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'dc0ba3db-3a12-49ef-8b39-a7b7867e295b.default',
    'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-875692608981/fraud-detect-demo/flow/export-flow-11-12-14-06-75409070/output',
     'LocalPath': '/opt/ml/processing/output',
     'S3UploadMode': 'EndOfJob'},
    'AppManaged': False}]},
 'ProcessingJobName': 'data-wrangler-flow-processing-11-12-14-06-75409070',
 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 2,
   'InstanceType': 'ml.m5.4xlarge',
   'VolumeSizeInGB': 30}},
 'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
 'AppSpecifica

<a id='aud-datasets'></a>

## DataSets and Feature Types
[overview](#all-up-overview)
----

In [28]:
customers_dtypes = {
    "policy_id": int,
    "customer_age": int,
    "customer_education": int,
    "months_as_customer": int,
    "policy_deductable": int,
    "policy_annual_premium": int,
    "policy_liability": int,
    "auto_year": int,
    "num_claims_past_year": int,
    "num_insurers_past_5_years": int,
    "customer_gender_male": int,
    "customer_gender_female": int,
    "policy_state_ca": int,
    "policy_state_wa": int,
    "policy_state_az": int,
    "policy_state_or": int,
    "policy_state_nv": int,
    "policy_state_id": int,
    "event_time": float,
}

### Load Processed Data into Pandas

We use the [AWS Data Wrangler library](https://github.com/awslabs/aws-data-wrangler) to load the exported 
dataset into a Pandas dataframe.

In [29]:
import awswrangler as wr

In [30]:
# ======> This is your DataFlow output path if you decide to redo the work in DataFlow on your own
if output_content_type.upper() == "CSV":
    customers_preprocessed = wr.s3.read_csv(
        path=s3_output_path, dataset=True, dtype=customers_dtypes
    )
else:
    print(f"Unexpected output content type {output_content_type}")

%store customers_preprocessed
customers_preprocessed

Stored 'customers_preprocessed' (DataFrame)


Unnamed: 0,policy_id,customer_age,customer_education,months_as_customer,policy_deductable,policy_annual_premium,policy_liability,auto_year,num_claims_past_year,num_insurers_past_5_years,customer_gender_male,customer_gender_female,policy_state_ca,policy_state_wa,policy_state_az,policy_state_or,policy_state_nv,policy_state_id,event_time
0,1,54,2,94,750,3000,1,2006,0,1,0,0,0,1,0,0,0,0,1.636633e+09
1,2,41,3,165,750,2950,0,2012,0,1,1,0,1,0,0,0,0,0,1.636633e+09
2,3,57,3,155,750,3000,0,2017,0,1,0,1,1,0,0,0,0,0,1.636633e+09
3,4,39,4,80,750,3000,2,2020,0,1,0,1,0,0,1,0,0,0,1.636633e+09
4,5,39,1,60,750,3000,0,2018,0,1,0,1,1,0,0,0,0,0,1.636633e+09
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4995,4996,33,2,4,750,2550,1,2015,0,3,1,0,1,0,0,0,0,0,1.636633e+09
4996,4997,45,3,150,750,3000,2,2015,0,1,0,1,0,0,0,0,1,0,1.636633e+09
4997,4998,28,1,87,750,2950,1,2016,0,1,1,0,1,0,0,0,0,0,1.636633e+09
4998,4999,21,1,5,750,3000,1,2018,0,2,0,1,1,0,0,0,0,0,1.636633e+09


We now have a set of Pandas DataFrames that contain the customer and claim data, with the correct data types. When Dat Wrangler encodes a feature as one-hot-encoded feature, it will default to float data types for those resulting features (one feature --> many columns for the one hot encoding). 

<font color ='red'> Note: </font> the reason for explicitly converting the data types for categorical features generated by Data Wrangler, is to ensure they are of type integer so that Clarify will treat them as categorical variables. 

## (Optional)Train a model with SageMaker
Now that the data has been processed, you may want to train a model using the data. The following shows an 
example of doing so using a popular algorithm - XGBoost. For more information on algorithms available in 
SageMaker, see [Getting Started with SageMaker Algorithms](https://docs.aws.amazon.com/sagemaker/latest/dg/algos.html). 
It is important to note that the following XGBoost objective ['binary', 'regression', 'multiclass'] 
hyperparameters, or content_type may not be suitable for the output data, and will require changes to 
train a proper model. Furthermore, for CSV training, the algorithm assumes that the target 
variable is in the first column. For more information on SageMaker XGBoost, 
see https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html.


### Set Training Data path
We set the training input data path from the output of the Data Wrangler processing job..

In [None]:
s3_training_input_path = s3_job_results_path
print(f"training input data path: {s3_training_input_path}")

### Configure the algorithm and training job

The Training Job hyperparameters are set. For more information on XGBoost Hyperparameters, 
see https://xgboost.readthedocs.io/en/latest/parameter.html.

In [None]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
hyperparameters = {
    "max_depth":"5",
    "objective": "reg:squarederror",
    "num_round": "10",
}
train_content_type = (
    "application/x-parquet" if output_content_type.upper() == "PARQUET"
    else "text/csv"
)
train_input = sagemaker.inputs.TrainingInput(
    s3_data=s3_training_input_path,
    content_type=train_content_type,
)

### Start the Training Job

The TrainingJob configurations are set using the SageMaker Python SDK Estimator, and which is fit using 
the training data from the Processing Job that was run earlier.

In [None]:
estimator = sagemaker.estimator.Estimator(
    container,
    iam_role,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
)
estimator.fit({"train": train_input})

Now that you have a trained model there are a number of different things you can do. 
For more details on training with SageMaker, please see 
https://sagemaker.readthedocs.io/en/stable/frameworks/xgboost/using_xgboost.html.