# A SageMaker Workflow

**This notebook only serves for test purpose, it has the same content as `pipeline.py`**

Check the notebool `sagemaker-pipelines-project.ipynb` for an end-to-end pipeline.

This notebook is adapted from  [workshop/10_pipeline/01_Create_SageMaker_Pipeline_BERT_Reviews.ipynb](https://github.com/data-science-on-aws/workshop/blob/master/10_pipeline/01_Create_SageMaker_Pipeline_BERT_Reviews.ipynb)

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and model registration:

![A typical ML Application pipeline](https://raw.githubusercontent.com/data-science-on-aws/workshop/23135c38b601894a4bec31a4516415d97a4750cc/10_pipeline/img/pipeline-full.png)

### Create SageMaker Clients and Session

First, we create a new SageMaker Session in the current AWS region. We also acquire the role arn for the session.

This role arn should be the execution role arn that you set up in the Prerequisites section of this notebook.

**Replace the bucket below with your own bucket name**

In [5]:
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = 'sm-nlp-data'
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

# Track the Pipeline as an `Experiment`

In [6]:
import time

timestamp = int(time.time())

In [7]:
pipeline_name = "kg-pipeline-{}".format(timestamp)
%store pipeline_name

Stored 'pipeline_name' (str)


In [8]:
from smexperiments.experiment import Experiment

pipeline_experiment = Experiment.create(
    experiment_name=pipeline_name,
    description="SPO Extraction Pipeline Experiment",
    sagemaker_boto_client=sm,
)

pipeline_experiment_name = pipeline_experiment.experiment_name
print("Pipeline experiment name: {}".format(pipeline_experiment_name))

Pipeline experiment name: kg-pipeline-1631099753


In [9]:
%store pipeline_experiment_name

Stored 'pipeline_experiment_name' (str)


# Create the `Trial`

In [10]:
from smexperiments.trial import Trial

pipeline_trial = Trial.create(
    trial_name="trial-{}".format(timestamp), experiment_name=pipeline_experiment_name, sagemaker_boto_client=sm
)

pipeline_trial_name = pipeline_trial.trial_name
print("Trial name: {}".format(pipeline_trial_name))

Trial name: trial-1631099753


In [11]:
%store pipeline_trial_name

Stored 'pipeline_trial_name' (str)


# Define Parameters to Parametrize Pipeline Execution

We define Workflow Parameters by which we can parametrize our Pipeline and vary the values injected and used in Pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - representing a `str` Python type
* `ParameterInteger` - representing an `int` Python type
* `ParameterFloat` - representing a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow below include:

* `processing_instance_type` - The `ml.*` instance type of the processing job.
* `processing_instance_count` - The instance count of the processing job. For illustrative purposes only: 1 is the only value that makes sense here.
* `train_instance_type` - The `ml.*` instance type of the training job.
* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes. Defaults to "PendingManualApproval". (NOTE: not available in service yet)
* `input_data` - The URL location of the input data

# Pipeline Parameters

In [12]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

# Experiment Parameters

In [13]:
%store -r pipeline_experiment_name

In [14]:
exp_name = ParameterString(
    name="ExperimentName",
    default_value=pipeline_experiment_name,
)

# Processing Step

![Define a Processing Step for Feature Engineering](https://raw.githubusercontent.com/data-science-on-aws/workshop/23135c38b601894a4bec31a4516415d97a4750cc/10_pipeline/img/pipeline-2.png)

In case you don't have the raw data in your s3 yet, run the following cell to download the dataset and upload it to s3.

In [15]:
%%bash -s "$bucket"

wget http://dataset-bj.cdn.bcebos.com/qianyan/DuIE_2_0.zip
aws s3 cp DuIE_2_0.zip "s3://$1/ie-baseline/raw/DuIE_2_0.zip"
rm DuIE_2_0.zip

upload: ./DuIE_2_0.zip to s3://sm-nlp-data/ie-baseline/raw/DuIE_2_0.zip


--2021-09-08 11:15:54--  http://dataset-bj.cdn.bcebos.com/qianyan/DuIE_2_0.zip
Resolving dataset-bj.cdn.bcebos.com (dataset-bj.cdn.bcebos.com)... 221.5.75.35
Connecting to dataset-bj.cdn.bcebos.com (dataset-bj.cdn.bcebos.com)|221.5.75.35|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 37097755 (35M) [application/zip]
Saving to: ‘DuIE_2_0.zip’

     0K .......... .......... .......... .......... ..........  0%  164K 3m41s
    50K .......... .......... .......... .......... ..........  0%  222K 3m11s
   100K .......... .......... .......... .......... ..........  0%  393K 2m38s
   150K .......... .......... .......... .......... ..........  0%  463K 2m18s
   200K .......... .......... .......... .......... ..........  0%  720K 2m0s
   250K .......... .......... .......... .......... ..........  0%  683K 1m49s
   300K .......... .......... .......... .......... ..........  0%  771K 1m40s
   350K .......... .......... .......... .......... ..........  1%  953K 92s


In [16]:
raw_input_data_s3_uri = "s3://{}/ie-baseline/raw/DuIE_2_0.zip".format(bucket)
print('input path:', raw_input_data_s3_uri)
processed_data_s3_uri = "s3://{}/ie-baseline/processed/".format(bucket)
print('output dir:', processed_data_s3_uri)

# smaller psudo dataset
# processed_data_s3_uri = "s3://{}/psudo/processed/".format(bucket)
# raw_input_data_s3_uri = "s3://{}/psudo/DuIE_2_0.zip".format(bucket)

input path: s3://sm-nlp-data/ie-baseline/raw/DuIE_2_0.zip
output dir: s3://sm-nlp-data/ie-baseline/processed/


In [17]:
!aws s3 ls $raw_input_data_s3_uri

2021-09-08 11:16:10   37097755 DuIE_2_0.zip


In [18]:
import time

input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)
output_dir = ParameterString(
    name="OutputData",
    default_value=processed_data_s3_uri,
)
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.c5.2xlarge")

In [19]:
# !pygmentize ./preprocess.py

We create an instance of an `SKLearnProcessor` processor and we use that in our `ProcessingStep`.

We also specify the `framework_version` we will use throughout.

Note the `processing_instance_type` and `processing_instance_count` parameters that used by the processor instance.

In [20]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
)

INFO:botocore.credentials:Credentials found in config file: ~/.aws/config
INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [21]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs = [
    ProcessingInput(
        input_name="raw",
        source=input_data,
        destination="/opt/ml/processing/ie/data/raw",
        s3_data_distribution_type="ShardedByS3Key",
    )
]

processing_outputs = [
    ProcessingOutput(
        output_name="train",
        destination = output_dir.default_value,
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/ie/data/processed",
    )
]

processing_step = ProcessingStep(
    name="Processing",
    code="preprocess.py",
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        "--input-data",
        processing_inputs[0].destination, # /opt/ml/processing/ie/data/raw
    ],
)

print(processing_step)

ProcessingStep(name='Processing', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


Finally, we use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance's `run` method, for those familiar with the existing Python SDK.

Note the `input_data` parameters passed into `ProcessingStep` as the input data of the step itself. This input data will be used by the processor instance when it is run.

# Train Step

In [22]:
train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.g4dn.16xlarge")
train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)

# Setup Training Hyper-Parameters
Note that `max_seq_length` is re-used from the processing hyper-parameters above

In [23]:
epochs = ParameterInteger(name="Epochs", default_value=1)
learning_rate = ParameterFloat(name="LearningRate", default_value=0.005)
batch_size = ParameterInteger(name="BatchSize", default_value=128)

# Setup Metrics To Track Model Performance

In [24]:
metric_definitions = [
    {'Name': 'eval:f1', 'Regex': 'f1: ([0-9\\.]+)'},
    {'Name': 'eval:prec', 'Regex': 'precision: ([0-9\\.]+)'},
    {'Name': 'eval:recall', 'Regex': 'recall: ([0-9\\./]+)'}
]

In [25]:
#  !pygmentize model.py

### Setup Debugger and Profiler
Define Debugger Rules as described here:  https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-built-in-rules.html

In [26]:
from sagemaker.debugger import Rule, ProfilerRule, rule_configs
from sagemaker.debugger import DebuggerHookConfig
from sagemaker.debugger import ProfilerConfig, FrameworkProfile

debugger_hook_config = DebuggerHookConfig(
    s3_output_path="s3://{}/ie-baseline/debug".format(bucket),
)

profiler_config = ProfilerConfig(
    system_monitor_interval_millis=500,
    framework_profile_params=FrameworkProfile(local_path="/opt/ml/output/profiler/", start_step=5, num_steps=10),
)

In [27]:
rules = [ProfilerRule.sagemaker(rule_configs.ProfilerReport())]

# Define a Training Step to Train a Model

We configure an Estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.

We also specify the model path where the models from training will be saved.

Note the `train_instance_type` parameter passed may be also used and passed into other places in the pipeline. In this case, the `train_instance_type` is passed into the estimator.

In [28]:
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.debugger import TensorBoardOutputConfig
import os

estimator = PyTorch(
    entry_point='train.py',
    source_dir='./',
    role=role,
    instance_type=train_instance_type, # ml.c5.4xlarge, ml.g4dn.4xlarge
    instance_count=train_instance_count,
    framework_version='1.8.1',
    py_version='py3',
    output_path=f"s3://{bucket}/ie-baseline/outputs",
    code_location=f"s3://{bucket}/ie-baseline/source/train", # where custom code will be uploaded 
    hyperparameters={
        'epochs': 20,
        'use-cuda': True,

    },
    metric_definitions = metric_definitions,
    debugger_hook_config=debugger_hook_config,
    profiler_config=profiler_config,
    rules=rules
)

INFO:botocore.credentials:Credentials found in config file: ~/.aws/config


### Setup Pipeline Step Caching
Cache pipeline steps for a duration of time using [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) format.  

More details on SageMaker Pipeline step caching here:  https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html

In [29]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

### Configure Training Step

Finally, we use the estimator instance to construct a `TrainingStep` as well as the `Properties` of the prior `ProcessingStep` used as input in the `TrainingStep` inputs and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to an estimator's `fit` method, for those familiar with the existing Python SDK.

In particular, we pass in the `S3Uri` of the `"train"`, `"validation"` and `"test"` output channel to the `TrainingStep`. The `properties` attribute of a Workflow step match the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved, or filled in, at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object.

In [30]:
processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri

<sagemaker.workflow.properties.Properties at 0x7ffb6f0eea20>

In [31]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="application/json",
        ),
    },
    cache_config=cache_config,
)

print(training_step)

TrainingStep(name='Train', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)


In [32]:
str(training_step.properties.OutputDataConfig.S3OutputPath)

'<sagemaker.workflow.properties.Properties object at 0x7ffb6ed67dd8>'

In [33]:
estimator.hyperparameters()

{'epochs': '20', 'use-cuda': 'true'}

# Evaluation Step

![Define a Model Evaluation Step to Evaluate the Trained Model](https://raw.githubusercontent.com/data-science-on-aws/workshop/23135c38b601894a4bec31a4516415d97a4750cc/10_pipeline/img/pipeline-4.png)

First, we develop an evaluation script that will be specified in a Processing step that will perform the model evaluation.

The evaluation script `evaluation.py` takes the trained model and the test dataset as input, and produces a JSON file containing classification evaluation metrics such as accuracy.

After pipeline execution, we will examine the resulting `evaluation.json` for analysis.

The evaluation script:

* loads in the model
* reads in the test data
* issues a bunch of predictions against the test data
* builds a classification report, including accuracy
* saves the evaluation report to the evaluation directory

Next, we create an instance of a `ScriptProcessor` processor and we use that in our `ProcessingStep`.

Note the `processing_instance_type` parameter passed into the processor.

In [34]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    role=role,
    framework_version="0.23-1",
#     py_version='py3',
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=7200
)

INFO:botocore.credentials:Credentials found in config file: ~/.aws/config
INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [35]:
# !pygmentize evaluate.py

We use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance's `run` method, for those familiar with the existing Python SDK.

The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and  [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively.

In [36]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(name="EvaluationReport", output_name="metrics", path="evaluation.json")

In [44]:
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    code="evaluate.py",
    inputs=[
        ProcessingInput(
            input_name='model',
#             TODO replace back: training_step.properties.ModelArtifacts.S3ModelArtifacts
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
#             source='s3://sm-nlp-data/ie-baseline/outputs/pipelines-kbq0c1plcgkk-Train-ydNFvHuBJZ/output/model.tar.gz',
            destination="/opt/ml/processing/input/model",
        ),
        ProcessingInput(
            input_name='data',
#             source='s3://sm-nlp-data/ie-baseline/train/',
            source=processing_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            destination="/opt/ml/processing/input/data",
        ),
        ProcessingInput(
            input_name='source',
#             source='s3://sm-nlp-data/ie-baseline/source/train/pytorch-training-2021-08-30-07-40-37-881/source/sourcedir.tar.gz',
            source=training_step.arguments['HyperParameters']['sagemaker_submit_directory'][1:-1],
            destination="/opt/ml/processing/input/source/train"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="metrics", s3_upload_mode="EndOfJob", source="/opt/ml/processing/output/metrics/"
        ),
    ],
    job_arguments=[
        "--max-seq-length",
        "128",
        "--source-dir",
        "/opt/ml/processing/input/source/train"
    ],
    property_files=[evaluation_report],
)

In [45]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

print(model_metrics)

<sagemaker.model_metrics.ModelMetrics object at 0x7ffb82389a90>


# Create Model Step

In order to perform batch transformation using the example model, create a SageMaker model.

Specifically, pass in the S3ModelArtifacts from the TrainingStep, step_train properties. The TrainingStep properties attribute matches the object model of the DescribeTrainingJob response object.

In [46]:
transform_instance_type = ParameterString(name="TransformInstanceType", default_value="ml.c5.4xlarge")

In [47]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.8.1",
    py_version="py3",
    instance_type=transform_instance_type,
    image_scope='inference'
)

In [48]:
from sagemaker.pytorch import PyTorchModel
from sagemaker.model import Model
from sagemaker.model import FrameworkModel
import time

# model_s3 = 's3://sm-nlp-data/psudo/model.tar.gz'
model_data = training_step.properties.ModelArtifacts.S3ModelArtifacts
model_name = "transform-model-{}".format(timestamp)

model = PyTorchModel(name=model_name,
                    model_data=model_data,
                    framework_version='1.3.1',
                    py_version='py3',
                    role=role,
                    entry_point='inference.py',
                    source_dir='./',
                    sagemaker_session=sess)



Supply the model input -- instance_type and accelerator_type for creating the SageMaker Model and then define the CreateModelStep passing in the inputs and the model instance defined before.


In [49]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


create_inputs = CreateModelInput(
    instance_type="ml.c5.4xlarge",
    accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
    name="CreateKgGenModel",
    model=model,
    inputs=create_inputs,
)


# Define a Transform Step to Perform Batch Transformation

In [50]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.c5.4xlarge",
    instance_count=1,
    output_path=f"s3://{bucket}/ie-baseline/outputs",
)

INFO:botocore.credentials:Credentials found in config file: ~/.aws/config


Run the following cell to upload a test input for transform task.

In [51]:
!aws s3 cp data/psudo_transform_input.json s3://$bucket/psudo/psudo.json

upload: data/psudo_transform_input.json to s3://sm-nlp-data/psudo/psudo.json


In [52]:
batch_data_uri = f's3://{bucket}/psudo/psudo.json'
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

In [53]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

step_transform = TransformStep(
    name="KgTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

# Define a Register Model Step to Create a Model Package

Use the estimator instance specified in the training step to construct an instance of RegisterModel. The result of executing RegisterModel in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.

The construction of RegisterModel is similar to an estimator instance's register method in the Python SDK.

Specifically, pass in the S3ModelArtifacts from the TrainingStep, step_train properties. The TrainingStep properties attribute matches the object model of the DescribeTrainingJob response object.

Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects.

In [54]:
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")
deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.m4.xlarge")
deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [55]:
model_package_group_name = f"KG-Generation-Models-{timestamp}"

print(model_package_group_name)

KG-Generation-Models-1631099753


In [56]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
#             's3://sagemaker-us-east-1-093729152554/sagemaker-scikit-learn-2021-08-30-08-55-25-686/output/metrics/evaluation.json'
        ),
        content_type="application/json",
    )
)
step_register = RegisterModel(
    name="KgRegisterModel",
    estimator=estimator,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
#     model_data=model_s3,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.c5.4xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

# Define a Condition Step to Check Accuracy and Conditionally Register Model

Finally, we'd like to only register this model if the accuracy of the model, as determined by our evaluation step `step_eval`, exceeded some value. A `ConditionStep` allows for pipelines to support conditional execution in the pipeline DAG based on conditions of step properties. 

Below, we:

* define a `ConditionGreaterThan` on the accuracy value found in the output of the evaluation step, `step_eval`.
* use the condition in the list of conditions in a `ConditionStep`
* pass the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`

In [57]:
min_f1_value = ParameterFloat(name="MinF1Value", default_value=0.5)

In [58]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_f1_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="f1",
    ),
    right=min_f1_value,  # accuracy
)

minimum_f1_condition_step = ConditionStep(
    name="F1Condition",
    conditions=[minimum_f1_condition],
    if_steps=[step_register, step_create_model, step_transform],  # success, continue with model registration
    else_steps=[],  # fail, end the pipeline
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# Define a Pipeline of Parameters, Steps, and Conditions

Let's tie it all up into a workflow pipeline so we can execute it, and even schedule it.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair so we tack on the timestamp to the name.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline need not be in the order of execution. The SageMaker Workflow service will resolve the _data dependency_ DAG as steps the execution complete.
* Steps must be unique to either pipeline step list or a single condition step if/else list.

In [59]:
%store -r pipeline_name

In [60]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        exp_name,
        input_data,
        output_dir,
        processing_instance_count,
        processing_instance_type,
        
        train_instance_type,
        train_instance_count,
        epochs,
        learning_rate,
        batch_size,
        
        batch_data,
        model_approval_status,
        deploy_instance_type,
        deploy_instance_count,
        transform_instance_type,
        
        min_f1_value
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_f1_condition_step],
    sagemaker_session=sess,
)

Let's examine the Json of the pipeline definition that meets the SageMaker Workflow Pipeline DSL specification.

By examining the definition, we're also confirming that the pipeline was well-defined, and that the parameters and step properties resolve correctly.

In [61]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)



{'Metadata': {},
 'Parameters': [{'DefaultValue': 'kg-pipeline-1631099753',
                 'Name': 'ExperimentName',
                 'Type': 'String'},
                {'DefaultValue': 's3://sm-nlp-data/ie-baseline/raw/DuIE_2_0.zip',
                 'Name': 'InputData',
                 'Type': 'String'},
                {'DefaultValue': 's3://sm-nlp-data/ie-baseline/processed/',
                 'Name': 'OutputData',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'ProcessingInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.c5.2xlarge',
                 'Name': 'ProcessingInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 'ml.g4dn.16xlarge',
                 'Name': 'TrainInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'TrainInstanceCount',
                 'Type': 'Integer'},
                {'Def

### Submit the pipeline to SageMaker and start execution

Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the jobs defined in the steps.

In [62]:
print(pipeline_experiment_name)

kg-pipeline-1631099753


In [63]:
response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)



arn:aws:sagemaker:us-east-1:093729152554:pipeline/kg-pipeline-1631099753


We'll start the pipeline, accepting all the default parameters.

Values can also be passed into these pipeline parameters on starting of the pipeline, and will be covered later. 

In [64]:
parameters=dict(
#     InputData=raw_input_data_s3_uri,
#     ProcessingInstanceCount=1,
#     ProcessingInstanceType="ml.c5.2xlarge",
#     MaxSeqLength=64,
#     BalanceDataset="True",
#     TrainSplitPercentage=0.9,
#     ValidationSplitPercentage=0.05,
#     TestSplitPercentage=0.05,
#     FeatureStoreOfflinePrefix="reviews-feature-store-" + str(timestamp),
#     FeatureGroupName="reviews-feature-group-" + str(timestamp),
#     LearningRate=0.000012,
#     TrainInstanceType="ml.c5.9xlarge",
#     TrainInstanceCount=1,
#     Epochs=1,
#     Epsilon=0.00000001,
#     TrainBatchSize=128,
#     ValidationBatchSize=128,
#     TestBatchSize=128,
#     TrainStepsPerEpoch=50,
#     ValidationSteps=50,
#     TestSteps=50,
#     TrainVolumeSize=1024,
#     UseXLA="True",
#     UseAMP="True",
#     FreezeBERTLayer="False",
#     EnableSageMakerDebugger="False",
#     EnableCheckpointing="False",
#     EnableTensorboard="False",
#     InputMode="File",
#     RunValidation="True",
#     RunTest="False",
#     RunSamplePredictions="False",
#     MinAccuracyValue=0.01,
#     ModelApprovalStatus="PendingManualApproval",
#     DeployInstanceType="ml.m4.xlarge",
#     DeployInstanceCount=1,
)

execution = pipeline.start()

print(execution.arn)

arn:aws:sagemaker:us-east-1:093729152554:pipeline/kg-pipeline-1631099753/execution/himz92gfbv1u


### Workflow Operations: examining and waiting for pipeline execution

Now we describe execution instance and list the steps in the execution to find out more about the execution.

In [65]:
from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)

{'CreatedBy': {},
 'CreationTime': datetime.datetime(2021, 9, 8, 11, 17, 37, 291000, tzinfo=tzlocal()),
 'LastModifiedBy': {},
 'LastModifiedTime': datetime.datetime(2021, 9, 8, 11, 17, 37, 291000, tzinfo=tzlocal()),
 'PipelineArn': 'arn:aws:sagemaker:us-east-1:093729152554:pipeline/kg-pipeline-1631099753',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:093729152554:pipeline/kg-pipeline-1631099753/execution/himz92gfbv1u',
 'PipelineExecutionDisplayName': 'execution-1631099857373',
 'PipelineExecutionStatus': 'Executing',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '409',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 08 Sep 2021 11:17:37 GMT',
                                      'x-amzn-requestid': '8cb43da2-03a5-4135-8b80-21208bd27cad'},
                      'HTTPStatusCode': 200,
                      'RequestId': '8cb43da2-03a5-4135-8b80-21208bd27cad',
                     

# Add Execution Run as Trial to Experiments

In [66]:
execution_run_name = execution_run["PipelineExecutionDisplayName"]
print(execution_run_name)

execution-1631099857373


In [67]:
pipeline_execution_arn = execution_run["PipelineExecutionArn"]
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:093729152554:pipeline/kg-pipeline-1631099753/execution/himz92gfbv1u


# List Execution Steps

In [68]:
import time

# Giving the first step time to start up
time.sleep(30)

execution.list_steps()

[{'StepName': 'Processing',
  'StartTime': datetime.datetime(2021, 9, 8, 11, 17, 38, 721000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:093729152554:processing-job/pipelines-himz92gfbv1u-processing-nztw3rk4jr'}}}]

# Wait for the Pipeline to Complete

# _Note: If this cell errors out with `WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded`, just re-run it and keep waiting._

In [69]:
%store -r pipeline_name

In [None]:
%%time

import time
from pprint import pprint

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

while pipeline_execution_status == "Executing":
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    #        print('Executions for our pipeline...')
    #        print(pipeline_execution_status)
    except Exception as e:
        print("Please wait...")
        time.sleep(30)

pprint(executions_response)

# Wait for the Pipeline ^^ Above ^^ to Complete

# _Note: If this cell errors out with `WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded`, just re-run it and keep waiting._

In [71]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

Executing


In [72]:
pipeline_execution_arn = executions_response[0]["PipelineExecutionArn"]
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:093729152554:pipeline/kg-pipeline-1631099753/execution/himz92gfbv1u


We can list the execution steps to check out the status and artifacts:

# List Pipeline Execution Steps

In [73]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

Executing


In [74]:
from pprint import pprint

steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

pprint(steps)

{'PipelineExecutionSteps': [{'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:093729152554:processing-job/pipelines-himz92gfbv1u-processing-nztw3rk4jr'}},
                             'StartTime': datetime.datetime(2021, 9, 8, 11, 17, 38, 721000, tzinfo=tzlocal()),
                             'StepName': 'Processing',
                             'StepStatus': 'Executing'}],
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '265',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Wed, 08 Sep 2021 11:18:22 GMT',
                                      'x-amzn-requestid': '93ce059a-5564-4e9c-a05c-95e62d55df16'},
                      'HTTPStatusCode': 200,
                      'RequestId': '93ce059a-5564-4e9c-a05c-95e62d55df16',
                      'RetryAttempts': 0}}


# List All Artifacts Generated By The Pipeline

In [75]:
processing_job_name = None
training_job_name = None

In [76]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(steps["PipelineExecutionSteps"]):
    print(execution_step)
    # We are doing this because there appears to be a bug of this LineageTableVisualizer handling the Processing Step
    if execution_step["StepName"] == "Processing":
        processing_job_name = execution_step["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
        print(processing_job_name)
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step["StepName"] == "Train":
        training_job_name = execution_step["Metadata"]["TrainingJob"]["Arn"].split("/")[-1]
        print(training_job_name)
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

INFO:botocore.credentials:Credentials found in config file: ~/.aws/config


{'StepName': 'Processing', 'StartTime': datetime.datetime(2021, 9, 8, 11, 17, 38, 721000, tzinfo=tzlocal()), 'StepStatus': 'Executing', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:093729152554:processing-job/pipelines-himz92gfbv1u-processing-nztw3rk4jr'}}}
pipelines-himz92gfbv1u-processing-nztw3rk4jr


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...08-11-17-31-055/input/code/preprocess.py,Input,DataSet,ContributedTo,artifact
1,s3://sm-nlp-data/ie-baseline/raw/DuIE_2_0.zip,Input,DataSet,ContributedTo,artifact
2,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://sm-nlp-data/ie-baseline/processed/,Output,DataSet,Produced,artifact
