# Save to S3 with a SageMaker Processing Job

<div class="alert alert-info"> 💡 <strong> Quick Start </strong>
To save your processed data to S3, select the Run menu above and click <strong>Run all cells</strong>. 
<strong><a style="color: #0397a7 " href="#Job-Status-&-S3-Output-Location">
    <u>View the status of the export job and the output S3 location</u></a>.
</strong>
</div>


This notebook executes your Data Wrangler Flow `data_wrangler_sample.flow` on the entire dataset using a SageMaker 
Processing Job and will save the processed data to S3.

This notebook saves data from the step `Dataset` from `Source: Titanic_Train.Csv`. To save from a different step, go to Data Wrangler 
to select a new step to export. 

---

## Contents

1. [Inputs and Outputs](#Inputs-and-Outputs)
1. [Run Processing Job](#Run-Processing-Job)
   1. [Job Configurations](#Job-Configurations)
   1. [Create Processing Job](#Create-Processing-Job)
   1. [Job Status & S3 Output Location](#Job-Status-&-S3-Output-Location)
1. [Optional Next Steps](#(Optional)Next-Steps)
    1. [Load Processed Data into Pandas](#(Optional)-Load-Processed-Data-into-Pandas)
    1. [Train a model with SageMaker](#(Optional)Train-a-model-with-SageMaker)
---

# Inputs and Outputs

The below settings configure the inputs and outputs for the flow export.

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

In <b>Input - Source</b> you can configure the data sources that will be used as input by Data Wrangler

1. For S3 sources, configure the source attribute that points to the input S3 prefixes
2. For all other sources, configure attributes like query_string, database in the source's 
<b>DatasetDefinition</b> object.

If you modify the inputs the provided data must have the same schema and format as the data used in the Flow. 
You should also re-execute the cells in this section if you have modified the settings in any data sources.

Parametrized data sources will be ignored when creating ProcessingInputs, and will directly read from the source.
Network isolation is not supported for parametrized data sources.
</div>

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition

data_sources = []

## Input - S3 Source: titanic_train.csv

In [None]:
data_sources.append(ProcessingInput(
    source="s3://machine-learning-workshop/sagemaker-data-wrangler/input/titanic_train.csv", # You can override this to point to other dataset on S3
    destination="/opt/ml/processing/titanic_train.csv",
    input_name="titanic_train.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Output: S3 settings

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

1. <b>bucket</b>: you can configure the S3 bucket where Data Wrangler will save the output. The default bucket from 
the SageMaker notebook session is used. 
2. <b>flow_export_id</b>: A randomly generated export id. The export id must be unique to ensure the results do not 
conflict with other flow exports 
3. <b>s3_ouput_prefix</b>:  you can configure the directory name in your bucket where your data will be saved.
</div>

In [None]:
import time
import uuid
import sagemaker

# Sagemaker session
sess = sagemaker.Session()

# You can configure this with your own bucket name, e.g.
# bucket = "my-bucket"
bucket = sess.default_bucket()
print(f"Data Wrangler export storage bucket: {bucket}")

# unique flow export ID
flow_export_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_export_name = f"flow-{flow_export_id}"

Below are the inputs required by the SageMaker Python SDK to launch a processing job.

In [None]:
# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = "55d4431e-5e62-4a47-a776-470e6b2fcd4d.default"

s3_output_prefix = f"export-{flow_export_name}/output"
s3_output_path = f"s3://{bucket}/{s3_output_prefix}"
print(f"Flow S3 export result path: {s3_output_path}")

processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_path,
    s3_upload_mode="EndOfJob"
)

## Upload Flow to S3

To use the Data Wrangler as an input to the processing job,  first upload your flow file to Amazon S3.

In [None]:
import os
import json
import boto3

# name of the flow file which should exist in the current notebook working directory
flow_file_name = "data_wrangler_sample.flow"

# Load .flow file from current notebook working directory 
!echo "Loading flow file from current notebook working directory: $PWD"

with open(flow_file_name) as f:
    flow = json.load(f)

# Upload flow to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"data_wrangler_flows/{flow_export_name}.flow", ExtraArgs={"ServerSideEncryption": "aws:kms"})

flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{flow_export_name}.flow"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

The Data Wrangler Flow is also provided to the Processing Job as an input source which we configure below.

In [None]:
## Input - Flow: data_wrangler_sample.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

# Run Processing Job 
## Job Configurations

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

You can configure the following settings for Processing Jobs. If you change any configurations you will 
need to re-execute this and all cells below it by selecting the Run menu above and click 
<b>Run Selected Cells and All Below</b>

1. IAM role for executing the processing job. 
2. A unique name of the processing job. Give a unique name every time you re-execute processing jobs
3. Data Wrangler Container URL.
4. Instance count, instance type and storage volume size in GB.
5. Content type for each output. Data Wrangler supports CSV as default and Parquet.
6. Network Isolation settings
7. KMS key to encrypt output data
</div>

In [None]:
# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# Unique processing job name. Give a unique name every time you re-execute processing jobs
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

# Data Wrangler Container URL.
container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"
# Pinned Data Wrangler Container URL. 
container_uri_pinned = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.24.2"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# List of tags to be passed to the processing job
user_tags = []

# Output configuration used as processing job container arguments 
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

# Refit configuration determines whether Data Wrangler refits the trainable parameters on the entire dataset. 
# When True, the processing job relearns the parameters and outputs a new flow file.
# You can specify the name of the output flow file under 'output_flow'.
# Note: There are length constraints on the container arguments (max 256 characters).
refit_trained_params = {
    "refit": False,
    "output_flow": f"data-wrangler-flow-processing-{flow_export_id}.flow"
}

# KMS key for per object encryption; default is None
kms_key = None

## Create Processing Job

To launch a Processing Job, you will use the SageMaker Python SDK to create a Processor function.

In [None]:
from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess,
    output_kms_key=kms_key,
    tags=user_tags
)

# Start Job
processor.run(
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    arguments=[f"--output-config '{json.dumps(output_config)}'"] + [f"--refit-trained-params '{json.dumps(refit_trained_params)}'"],
    wait=False,
    logs=False,
    job_name=processing_job_name
)

## Job Status & S3 Output Location

Below you wait for processing job to finish. If it finishes successfully, the raw parameters used by the 
Processing Job will be printed

In [None]:
s3_job_results_path = f"s3://{bucket}/{s3_output_prefix}/{processing_job_name}"
print(f"Job results are saved to S3 path: {s3_job_results_path}")

job_result = sess.wait_for_processing_job(processing_job_name)
job_result

## (Optional)Next Steps

Now that data is available on S3 you can use other SageMaker components that take S3 URIs as input such as 
SageMaker Training, Built-in Algorithms, etc. Similarly you can load the dataset into a Pandas dataframe 
in this notebook for further inspection and work. The examples below show how to do both of these steps.

By default optional steps do not run automatically, set `run_optional_steps` to True if you want to 
execute optional steps

In [None]:
run_optional_steps = False

In [None]:
# This will stop the below cells from executing if "Run All Cells" was used on the notebook.
if not run_optional_steps:
    raise SystemExit("Stop here. Do not automatically execute optional steps.")

### (Optional) Load Processed Data into Pandas

We use the [AWS Data Wrangler library](https://github.com/awslabs/aws-data-wrangler) to load the exported 
dataset into a Pandas dataframe for a preview of first 1000 rows.

In [None]:
!pip install -q awswrangler pandas
import awswrangler as wr

In [None]:
chunksize = 1000

if output_content_type.upper() == "CSV":
    dfs = wr.s3.read_csv(s3_output_path, chunksize=chunksize)
elif output_content_type.upper() == "PARQUET":
    dfs = wr.s3.read_parquet(s3_output_path, chunked=chunksize)
else:
    print(f"Unexpected output content type {output_content_type}") 

df = next(dfs)
df

## (Optional)Train a model with SageMaker
Now that the data has been processed, you may want to train a model using the data. The following shows an 
example of doing so using a popular algorithm - XGBoost. For more information on algorithms available in 
SageMaker, see [Getting Started with SageMaker Algorithms](https://docs.aws.amazon.com/sagemaker/latest/dg/algos.html). 
It is important to note that the following XGBoost objective ['binary', 'regression', 'multiclass'] 
hyperparameters, or content_type may not be suitable for the output data, and will require changes to 
train a proper model. Furthermore, for CSV training, the algorithm assumes that the target 
variable is in the first column. For more information on SageMaker XGBoost, 
see https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html.


### Set Training Data path
We set the training input data path from the output of the Data Wrangler processing job..

In [None]:
s3_training_input_path = s3_job_results_path
print(f"training input data path: {s3_training_input_path}")

### Configure the algorithm and training job

The Training Job hyperparameters are set. For more information on XGBoost Hyperparameters, 
see https://xgboost.readthedocs.io/en/latest/parameter.html.

In [None]:
region = boto3.Session().region_name
container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")
hyperparameters = {
    "max_depth":"5",
    "objective": "reg:squarederror",
    "num_round": "10",
}
train_content_type = (
    "application/x-parquet" if output_content_type.upper() == "PARQUET"
    else "text/csv"
)
train_input = sagemaker.inputs.TrainingInput(
    s3_data=s3_training_input_path,
    content_type=train_content_type,
)

### Start the Training Job

The TrainingJob configurations are set using the SageMaker Python SDK Estimator, and which is fit using 
the training data from the Processing Job that was run earlier.

In [None]:
estimator = sagemaker.estimator.Estimator(
    container,
    iam_role,
    hyperparameters=hyperparameters,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
)
estimator.fit({"train": train_input})

Now that you have a trained model there are a number of different things you can do. 
For more details on training with SageMaker, please see 
https://sagemaker.readthedocs.io/en/stable/frameworks/xgboost/using_xgboost.html.