# Create SageMaker Pipelines from a Data Wrangler Flow

You can use Amazon SageMaker Pipelines to create end-to-end workflows that manage and deploy SageMaker jobs. 
Pipelines come with SageMaker Python SDK integration, so you can build each step of your workflow using a 
Python-based interface.

This notebook create a SageMaker pipeline that executes your Data Wrangler Flow `untitled.flow` on the 
entire dataset as a data preparation step and optionally you can add additional steps to the pipeline.
You will execute the pipeline and monitor its status using SageMaker Pipeline APIs.

The pipeline will be processing data from the step `Sampling` from `Source: Card_Transactiondata.Csv` (**Step Output Name: default**). To save from a different step, go to Data Wrangler 
to select a new step to export. After your workflow is deployed, you can view the Directed Acyclic Graph
(DAG) for your pipeline and manage your executions using Amazon SageMaker Studio.

# Inputs and Outputs

The below settings configure the inputs and outputs for the flow export.

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

In <b>Input - Source</b> you can configure the data sources that will be used as input by Data Wrangler

1. For S3 sources, configure the source attribute that points to the input S3 prefixes
2. For all other sources, configure attributes like query_string, database in the source's 
<b>DatasetDefinition</b> object.

If you modify the inputs the provided data must have the same schema and format as the data used in the Flow. 
You should also re-execute the cells in this section if you have modified the settings in any data sources.
</div>

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition

data_sources = []

## Input - S3 Source: card_transactiondata.csv


In [None]:
data_sources.append(ProcessingInput(
    source="s3://creditcardfraud-demo/card_transactiondata.csv", # You can override this to point to other dataset on S3
    destination="/opt/ml/processing/card_transactiondata.csv",
    input_name="card_transactiondata.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Output: S3 settings

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

1. <b>bucket</b>: you can configure the S3 bucket where Data Wrangler will save the output. The default bucket from 
the SageMaker notebook session is used. 
2. <b>flow_export_id</b>: A randomly generated export id. The export id must be unique to ensure the results do not 
conflict with other flow exports 
3. <b>s3_ouput_prefix</b>:  you can configure the directory name in your bucket where your data will be saved.
</div>

In [None]:
import time
import uuid
import sagemaker

# Sagemaker session
sess = sagemaker.Session()

# You can configure this with your own bucket name, e.g.
# bucket = "my-bucket"
bucket = sess.default_bucket()
print(f"Data Wrangler export storage bucket: {bucket}")

# unique flow export ID
flow_export_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_export_name = f"flow-{flow_export_id}"

Below are the inputs required by the SageMaker Python SDK to launch a processing job.

In [None]:
# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = "1a98adca-9fde-4537-b887-e3841776c6b8.default"

s3_output_prefix = f"export-{flow_export_name}/output"
s3_output_path = f"s3://{bucket}/{s3_output_prefix}"
print(f"Flow S3 export result path: {s3_output_path}")

processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_path,
    s3_upload_mode="EndOfJob"
)

## Upload Flow to S3

To use the Data Wrangler as an input to the processing job,  first upload your flow file to Amazon S3.

In [None]:
import os
import json
import boto3

# name of the flow file which should exist in the current notebook working directory
flow_file_name = "untitled.flow"

# Load .flow file from current notebook working directory 
!echo "Loading flow file from current notebook working directory: $PWD"

with open(flow_file_name) as f:
    flow = json.load(f)

# Upload flow to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"data_wrangler_flows/{flow_export_name}.flow", ExtraArgs={"ServerSideEncryption": "aws:kms"})

flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{flow_export_name}.flow"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

The Data Wrangler Flow is also provided to the Processing Job as an input source which we configure below.

In [None]:
## Input - Flow: untitled.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

## Create Processor

In [None]:
# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# Unique processing job name. Give a unique name every time you re-execute processing jobs
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

# Data Wrangler Container URL.
container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"
# Pinned Data Wrangler Container URL. 
container_uri_pinned = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.20.1"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# List of tags to be passed to the processing job
user_tags = []

# Output configuration used as processing job container arguments 
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

# KMS key for per object encryption; default is None
kms_key = None

## Create Processing Job

To launch a Processing Job, you will use the SageMaker Python SDK to create a Processor function.

In [None]:
from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess,
    output_kms_key=kms_key,
    tags=user_tags
)

# Create SageMaker Pipeline 
## Define Pipeline Steps
To create a SageMaker pipeline, you will first create a `ProcessingStep` using the Data Wrangler processor defined above.

In [None]:
from sagemaker.workflow.steps import ProcessingStep

data_wrangler_step = ProcessingStep(
    name="DataWranglerProcessingStep",
    processor=processor,
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    job_arguments=[f"--output-config '{json.dumps(output_config)}'"],
)

You can optionally add a `TrainingStep` to the pipeline that trains a model on the preprocessed train data set. By default we are not adding training step, set `add_training_step` below to True if you want to add training step.

You can also add more steps. To learn more about adding
steps to a pipeline, see
[Define a Pipeline](http://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html)
in the SageMaker documentation.

In [None]:
add_training_step = False

In [None]:
if add_training_step:
    import boto3
    from sagemaker.estimator import Estimator
    from sagemaker.workflow.functions import Join

    region = boto3.Session().region_name

    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.2-1",
        py_version="py3",
        instance_type=instance_type,
    )
    xgb_train = Estimator(
        image_uri=image_uri,
        instance_type=instance_type,
        instance_count=1,
        role=iam_role,
    )
    xgb_train.set_hyperparameters(
        objective="reg:squarederror",
        num_round=3,
    )

    from sagemaker.inputs import TrainingInput
    from sagemaker.workflow.steps import TrainingStep

    xgb_input_content_type = None

    if output_content_type == "CSV":
        xgb_input_content_type = 'text/csv'
    elif output_content_type == "Parquet":
        xgb_input_content_type = 'application/x-parquet'

    training_step = TrainingStep(
        name="DataWrangerTrain",
        estimator=xgb_train,
        inputs={
            "train": TrainingInput(
                s3_data=Join(
                    on="/",
                    values=[
                        data_wrangler_step.properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri,
                        data_wrangler_step.properties.ProcessingJobName,
                        f'{output_name.replace(".", "/")}',
                    ]
                ),
                content_type=xgb_input_content_type
            )
        }
    )

## Define a Pipeline of Parameters, Steps
Now you will create the SageMaker pipeline that combines the steps created above so it can be executed. 

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The parameters supported in this notebook includes:

- `instance_type` - The ml.* instance type of the processing job.
- `instance_count` - The instance count of the processing job.

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
# Define Pipeline Parameters
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.4xlarge")
instance_count = ParameterInteger(name="InstanceCount", default_value=1)

You will create a pipeline with the steps and parameters defined above

In [None]:
import time
import uuid

from sagemaker.workflow.pipeline import Pipeline

# Create a unique pipeline name with flow export name
pipeline_name = f"pipeline-{flow_export_name}"

# Combine pipeline steps
pipeline_steps = [data_wrangler_step]
if add_training_step:
    pipeline_steps.append(training_step)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[instance_type, instance_count],
    steps=pipeline_steps,
    sagemaker_session=sess
)

### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and 
the parameters and step properties resolve correctly.

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

## Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the SageMaker Pipeline service and start an execution. The role passed in 
will be used by the Pipeline service to create all the jobs defined in the steps.

In [None]:
pipeline.upsert(role_arn=iam_role)
execution = pipeline.start()

## Pipeline Operations: Examine and Wait for Pipeline Execution

Describe the pipeline execution and wait for its completion.

In [None]:
execution.wait()

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step 
executor service.

In [None]:
execution.list_steps()

You can visualize the pipeline execution status and details in Studio. For details please refer to 
[View, Track, and Execute SageMaker Pipelines in SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html)

## (Optional) Pipeline cleanup
Set `pipeline_deletion` flag below to `True` to delete the SageMaker Pipelines created in this notebook.

In [None]:
pipeline_deletion = False

In [None]:
if pipeline_deletion:
    pipeline.delete()