In [1]:
#!pip install botocore
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker 
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

import botocore.config

config = botocore.config.Config(
    user_agent_extra = 'dsoaws/1.0'
    )

sm = boto3.Session().client(service_name="sagemaker",
                           region_name = region,
                            config = config)
s3 = boto3.Session().client(service_name="s3",
                           region_name = region, 
                           config = config)
featurestore_runtime = boto3.Session().client(service_name = "sagemaker-featurestore-runtime",
                                             region_name=region,
                                             config=config)


### Set S3 Source Location (Public S3 Bucket)

In [2]:
s3_public_path_tsv ="s3://amazon-reviews-pds/tsv"

In [3]:
%store s3_public_path_tsv

Stored 's3_public_path_tsv' (str)


### Set S3 Destination Location (Our Private S3 Bucket)


In [4]:
s3_private_path_tsv ="s3://{}/amazon-reviews-pds/tsv".format(bucket)
print(s3_private_path_tsv)

s3://sagemaker-us-east-1-252915492200/amazon-reviews-pds/tsv


In [5]:
%store s3_private_path_tsv

Stored 's3_private_path_tsv' (str)


### Copy data from the Public S3 Bucket to our Private S3 Bucket in this Account

In [6]:
!aws s3 cp --recursive $s3_public_path_tsv $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Digital_Software_v1_00.tsv.gz"
!aws s3 cp --recursive $s3_public_path_tsv $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz"
!aws s3 cp --recursive $s3_public_path_tsv $s3_private_path_tsv/ --exclude "*" --include "amazon_reviews_us_Gift_Gard_v1_00.tsv.gz"

copy: s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz to s3://sagemaker-us-east-1-252915492200/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz
copy: s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz to s3://sagemaker-us-east-1-252915492200/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Games_v1_00.tsv.gz


### Track the Pipeline as an Experiment

In [7]:
import time

In [8]:
%store -r pipeline_name
try :
    print("Using existing pipeline: {}".format(pipeline_name))
except NameError :
    timestamp = int(time.time())
    pipeline_name = "BERT-pipeline-{}".format(timestamp)
    print("Creating Pipeline Name:",pipeline_name)

Using existing pipeline: BERT-pipeline-1651055982


In [9]:
running_executions = 0
completed_executions = 0

try :
    existing_pipeline_executions_response = sm.list_pipeline_execution(
        PipelineName = pipeline_name,
        SortOrder = "Descending"
    )
    if "PipelineExecutionSummaries" in existing_pipeline_execution_response.keys():
        if len(existing_pipeline_execution_response["PipelineExecutionSummaries"]) >0:
            execution = existing_pipeline_execution_response["PipelineExecutionSummaries"][0]
            if "PipelineExecutionSummaries" in execution:
                if execution["PipelineExecutionSummaries"] == "Executing":
                    running_executions = running_executions + 1
                else :
                    completed_executions = completed_executions + 1
            print("[INFO] You have {} Pipeline execution(s) currently running and {} execution(s) completed".format(
            running_executions,completed_executions)
                 )
    else :
        print("[OK] Please continues")
except:
    pass

if running_executions ==0:
    timestamp = int(time.time())
    pipeline_name = "BERT-pipeline-{}".format(timestamp)
    print("Created Pipeline Name:",pipeline_name)
            
    
                    


Created Pipeline Name: BERT-pipeline-1651066961


In [10]:
print(pipeline_name)

BERT-pipeline-1651066961


In [11]:
%store pipeline_name

Stored 'pipeline_name' (str)


### Create the Trial

In [12]:
from smexperiments.trial import Trial

In [13]:
%store -r pipeline_trial_name

try:
    pipeline_trial_name
except NameError:
    timestamp = int(time.time())
    pipeline_trial = Trial.create(
        trial_name="Trial-{}".format(timestamp),experiment_name = pipeline_experiment_name, sagemaker_boto_client =sm
    )
    pipeline_trial_name = pipeline_trial.trial_name
    print("Created Trial Name :{}".format(pipeline_trial_name))

In [14]:
print(pipeline_trial_name)

trial-1650926425


In [15]:
%store pipeline_trial_name

Stored 'pipeline_trial_name' (str)


### Define Parameters to Parametrize Pipeline Execution

We define Workflow Parameters by which we can parametrize our Pipeline and vary the values injected and used in Pipeline executions and schedules without to modify the Pipeline defintion.
The supported parameter types include :
- ParameterString : representing a str Python type
- ParameterInteger :representing a int Python type
- ParameterFloat :representing a float Python type


In [16]:
from sagemaker.workflow.parameters import(
    ParameterString ,
    ParameterInteger,
    ParameterFloat
)

In [17]:
raw_input_data_s3_url = "/bert_pipeline/biography.csv"
print(raw_input_data_s3_url)

/bert_pipeline/biography.csv


In [18]:
!aws s3 ls $raw_input_data_s3_url

2022-04-04 08:53:03 aws-athena-query-results-252915492200-us-east-1
2022-04-04 09:23:03 aws-athena-query-results-us-east-1-252915492200
2022-04-04 08:41:42 aws-cloudtrail-logs-252915492200-3b317ae5
2022-04-22 16:13:30 sagemaker-project-p-co7l6mjhioym
2022-04-22 17:06:57 sagemaker-project-p-naedfukawvvw
2022-04-20 12:50:25 sagemaker-project-p-uiyuvpon0dkv
2022-04-22 17:11:02 sagemaker-project-p-ydr3lke0ekeu
2022-04-20 12:38:25 sagemaker-studio-252915492200-8wcjntn99wb
2022-04-20 12:39:29 sagemaker-studio-252915492200-kud7xcrx35
2022-04-25 21:21:12 sagemaker-us-east-1-252915492200
2022-04-22 17:34:01 sagemaker-us-west-2-252915492200


In [19]:
import time 

timestamp = int(time.time())

input_data = ParameterString(
        name="InputData",
        default_value = raw_input_data_s3_url,
)

processing_instance_count = ParameterInteger(
    name = "ProcessingInstanceCount",
    default_value =1,
)

processing_instance_type = ParameterString(
    name = "ProcessingInstanceType",
    default_value = "ml.c5.2xlarge",
)

max_seq_length = ParameterInteger(
    name = "MaxSeqLength",
    default_value = 64,
)

balance_dataset = ParameterString(
    name = "BalanceDataset",
    default_value = "True",
)

train_split_percentage = ParameterFloat(
    name = "TrainSplitPercentage",
    default_value = 0.90,
)

validation_split_percentage = ParameterFloat(
    name = "ValidationSplitPercentage",
    default_value = 0.05,
)

test_split_percentage = ParameterFloat(
    name = "ValidationSplitPercentage",
    default_value = 0.05,
)

feature_store_offline_prefix = ParameterString(
    name = "FeaturesStoreOfflinePrefix",
    default_value = "reviews-features-store-" + str(timestamp),
)

feature_group_name = ParameterString(
    name = "FeaturesGroupName",
    default_value = "reviews-features-group-" + str(timestamp),
)

We create an instance of an SKlearnProcessor processor and we use that in our ProcessingStep

In [20]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version= "0.23-1",
    role = role,
    instance_type = processing_instance_type,
    instance_count = processing_instance_count,
    env={"AWS_DEFAULT_REGION":region})

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [21]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs = [
    ProcessingInput(
        input_name = "raw-input-data",
        source = input_data,
        destination = "/opt/ml/processing/input/data/",
        s3_data_distribution_type = "ShardedByS3Key",
    )
    
]

processing_outputs = [
    ProcessingOutput(
        output_name = "bert-train",
        s3_upload_mode ="EndOfJob",
        source = "/opt/ml/processing/output/bert/train",
    ),
        
    ProcessingOutput(
        output_name = "bert-validation",
        s3_upload_mode = "EndofJob",
        source  = "/opt/ml/processing/output/bert/validation"
    ),
    ProcessingOutput(
        output_name = "bert-test",
        s3_upload_mode = "EndofJob",
        source  = "/opt/ml/processing/output/bert/test"
    ),
]
processing_step = ProcessingStep(
    name = "Processing",
    code = "preprocessing.py",
    processor = processor,
    inputs = processing_inputs,
    outputs= processing_outputs,
    job_arguments = [
        "--train-split-percentage",
        str(train_split_percentage.default_value),
        "--validation-split-percentage",
        str(validation_split_percentage.default_value),
        "--test-split-percentage",
        str(test_split_percentage.default_value),
        "--max-seq-length",
        str(max_seq_length.default_value),
        "--balance-dataset",
        str(balance_dataset.default_value),
        "--feature-store-offline-prefix",
        str(feature_store_offline_prefix.default_value),
        "--feature-group-name",
        str(feature_group_name.default_value),
    ],
)

print(processing_step)

ProcessingStep(name='Processing', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


In [22]:
train_instance_type = ParameterString(name="TrainInstanceType", default_value = "ml.c5.9xlarge")
train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value = 1)

### Setup Training Hyper-Parameters 
Note that max_seq_length is re-used from the processing hyper-parameters above

In [23]:
epochs = ParameterInteger(name="Epochs", default_value=1)

learning_rate = ParameterFloat(name="LearningRate", default_value=0.00001)

epsilon = ParameterFloat(name="Epsilon", default_value=0.00000001)

train_batch_size = ParameterInteger(name="TrainBatchSize", default_value=128)

validation_batch_size = ParameterInteger(name="ValidationBatchSize", default_value=128)

test_batch_size = ParameterInteger(name="TestBatchSize", default_value=128)

train_steps_per_epoch = ParameterInteger(name="TrainingStepPerEpoch", default_value=50)

validation_steps = ParameterInteger(name="ValidationSteps",default_value=50)

test_steps = ParameterInteger(name="TestSteps",default_value=50)

train_volume_size = ParameterInteger(name = "TrainVolumeSize",default_value=256)

use_xla= ParameterString(
    name ="UseXLA",
    default_value = "True",
)

use_map = ParameterString(
    name = "UseAMP",
    default_value = "True",
)

freeze_bert_layer = ParameterString(
    name = "freezeBERTlayer",
    default_value = "False",
)

enable_sagemaker_debugger = ParameterString(
    name = "EnableSageMakerDebugger",
    default_value = "False"
)

enable_checkpointing = ParameterString(
    name = "EnableCheckpointing",
    default_value = "False",
)

enable_tensorboard = ParameterString(
    name = "EnableTensorboard",
    default_value = "False"
)

input_mode = ParameterString(
    name = "InputMode",
    default_value = "File",
    )

run_test = ParameterString(
    name = "RunTest",
    default_value = "False",
)

run_validation = ParameterString(
    name = "RunValidation",
    default_value = "False",
)

run_sample_predictions = ParameterString(
    name = "RunSamplePredictions",
    default_value = "False",
)



### Setup Metrics to Track Model Performance

In [24]:
metrics_definitions = [
    {"Name":"train:loss", "Regex":"loss: ([0-9\\.]+)"},
    {"Name":"train:accuracy", "Regex":"accuracy: ([0-9\\.]+)"},
    {"Name":"validation:loss", "Regex":"val_loss: ([0-9\\.]+)"},
    {"Name":"validation:accuracy", "Regex":"val_accuracy: ([0-9\\.]+)"},
]

### Setup Debugger and Profile

In [25]:
from sagemaker.debugger import Rule, ProfilerRule, rule_configs
from sagemaker.debugger import DebuggerHookConfig
from sagemaker.debugger import ProfilerConfig, FrameworkProfile

debugger_hook_config = DebuggerHookConfig(
    s3_output_path = "s3://{}".format(bucket),
)

profiler_config = ProfilerConfig(
    system_monitor_interval_millis = 500,
    framework_profile_params = FrameworkProfile(local_path="/opt/ml/output/profile",start_step=5, num_steps=10),
    )

In [26]:
rules =[ProfilerRule.sagemaker(rule_configs.ProfilerReport())]

### Create the estimator 
We configure an Estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, train a model and saves the model to "model_dir" so that it can be hosted later.
We also specify the model_path where the models from training will be saved

In [27]:
from sagemaker.workflow.properties import PropertyFile
evaluation_report = PropertyFile(name="EvaluationReport",output_name = "metrics",path="evaluation.json")

In [28]:
from sagemaker.tensorflow import TensorFlow

estimator = TensorFlow(
    entry_point = "evaluation.py",
    #source_dir = "bert_pipeline/",
    role = role,
    instance_count = train_instance_count, # Make sure you have at least this number of input files 
    instance_type = train_instance_type,
    volume_size = train_volume_size,
    py_version = "py37",
    framework_version ="2.3.1",
    hyperparameters={
        "epochs":epochs,
        "learning_rate": learning_rate,
        "train_batch_size": train_batch_size,
        "validation_batch_size": train_batch_size,
        "test_batch_size": test_batch_size,
        "train_steps_per_epoch":train_steps_per_epoch,
        "validation_steps": validation_steps,
        "test_steps": test_steps,
        "usa_xla": use_xla,
        "use_map": use_map,
        "max_seq_length": max_seq_length,
        "freeze_bert_layer": freeze_bert_layer,
        "enable_sagemaker_debugger": enable_sagemaker_debugger,
        "enable_checkpointing":enable_checkpointing,
        "enable_tensorboard":enable_tensorboard,
        "run_validation" : run_validation,
        "run_test":run_test,
        "run_sample_predictions": run_sample_predictions,
        
    },
    input_mode = input_mode,
    metrics_definitions = metrics_definitions,
    debugger_hook_config = debugger_hook_config,
    profiler_config = profiler_config,
    rules = rules 
    
    
    
)

In [29]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name = "Train",
    estimator = estimator,
    inputs={
        "train": TrainingInput(
        s3_data= processing_step.properties.ProcessingOutputConfig.Outputs["train-bert"].S3Output.S3Uri,
        content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data = processing_step.properties.ProcessingOutputConfig.Outputs["bert-validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data = processing_step.properties.ProcessingOutputConfig.Outputs["bert-test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        
    },
    #cache_config =cache_config,
)
print(training_step)

TrainingStep(name='Train', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)


## Evaluation Step

First, we develop an evaluation script that will be specified in a Processing step that will perform the model evaluation.
The evaluation script "evaluation.py" takes the trained_model and the test dataset as input, and produce a JSON file containing  classification evaluation metrics such as accuracy.
After the pipeline execution, we will examine the resulting "evaluation.json" for analysis.

The evaluation script :
- loads in the model
- reads in the test data
- issues a bunch of predictions against the test data
- build a classification report, including the accuracy
- save the evaluation report to the evaluation directory

Next we create an instance of a "ScriptProcessor" processor and we use that in our ProcessingStep.


In [30]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version = "0.23-1",
    role = role,
    instance_type = processing_instance_type,
    instance_count = processing_instance_count,
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [31]:
evaluation_step = ProcessingStep(
    name = "EvaluateModel",
    processor = evaluation_processor,
    code = "evaluation.py",
    inputs = [
        ProcessingInput(
            source= training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination = "/opt/ml/processing/input/model",
        ),
        ProcessingInput(
            source= processing_step.properties.ProcessingInputs["raw-input-data"].S3Input.S3Uri,
            destination = "/opt/ml/processing/input/data",
        ),
    ],
    outputs = [
        ProcessingOutput(
        output_name = "metrics", s3_upload_mode="EndOfJob", source ="/opt/ml/processing/output/metrics/"
        ),
    ],
    job_arguments=[
        "--max-seq-length",
        str(max_seq_length.default_value)
    ],
    property_files = [evaluation_report]
)

In [32]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics= MetricsSource(
        s3_uri="{}/evaluation.json".format(
        evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    ),
    content_type="application/json"
)
)
print(model_metrics)

<sagemaker.model_metrics.ModelMetrics object at 0x7f339072a5d0>


## Register Model Step
We use the estimator instance that was used for the training step to construct an instance of RegisterModel. The result of executing RegisterModel in a pipeline is a Model Package. A Model Package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. Primarly, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A Model Package Group is a collection of ModelPackages.

In [33]:
model_approval_status = ParameterString(name="ModelApprovalStatus",default_value ="PendingManualApproval")
deploy_instance_type = ParameterString(name ="DeployInstanceType", default_value ="ml.m5.4xlarge")
deploy_instance_count = ParameterInteger(name ="DeployInstanceCount", default_value=1)

In [34]:
model_package_group_name = f"BERT-reviews-{timestamp}"
print(model_package_group_name)

BERT-reviews-1651066963


In [35]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework ="tensorflow",
    region=region,
    version ="2.3.1",
    instance_type = deploy_instance_type,
    image_scope = "inference",
)

print(inference_image_uri)

763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-inference:2.3.1-cpu


In [36]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name ="RegisterModel",
    estimator =estimator,
    image_uri = inference_image_uri, # We have to specify, by default it's using he training image 
    model_data = training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types = ["application/jsonlines"],
    response_types = ["application/jsonlines"],
    inference_instances = deploy_instance_type,
    transform_instances = ["ml.m5.4xlarge"],
    model_package_group_name = model_package_group_name,
    approval_status = model_approval_status,
    model_metrics = model_metrics,
    
)

## Create Model for Deployment Step

In [37]:
from sagemaker.model import Model

model_name = "bert-model-{}".format(timestamp)

model = Model(
    name = model_name,
    image_uri = inference_image_uri,
    model_data = training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session = sess,
    role = role,
)

In [38]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type
)

In [39]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs =create_inputs,
)

## Conditional Deployment Step

Below we do the following:
- define a "ConditionGreaterThan" on the accuracy value found in the output of the evaluation step
- use the condition in the list of the conditions in a "ConditionStep"
- pass the "RegisterModel" step collection into the "if_steps" of the "ConditionStep"


In [40]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet)

min_accuracy_value = ParameterFloat(name="MinAccuracyValue",default_value=0.90)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet[
    step=evaluation_step,
    property_file=evaluation_report,
    json_path="metrics.accuracy.value",
    ],
    right=min_accuracy_value,
)
minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition", 
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step,create_step], # success, continue with model registration
    else_steps=[], #fail, end the pipeline
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


## Define a Pipeline of Parameters, Steps, and Conditions

Let's tie all up into a workflow pipeline so we can execute it and even schedule it.
A pipeline requires a name, parameters, and steps. Names must be unique within an (account, region) pair so we tack on the timestamp to the name.
Note :
- All the parameters used in the definitions must be present.
- Steps passed into the pipeline need to be in the order of execution. The SageMaker Worflow service will resolve the data dependency DAG as steps the execution complete.
- Steps must be unique to either pipeline step list or a single condition step if/else list.



### Submit the pipeline to SageMaker for Execution

Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the job defined in the steps.

### Create the pipeline

In [41]:
from sagemaker.workflow.pipeline import Pipeline

existing_pipelines = 0

existing_pipelines_response = sm.list_pipelines(
    PipelineNamePrefix=pipeline_name,
    SortOrder="Descending",
)

if "PipelineSummaries" in existing_pipelines_response.keys():
    if len(existing_pipelines_response["PipelineSummaries"])>0:
        existing_pipelines= existing_pipelines + 1
        print("[INFO] You already have created {} pipeline with name {}".format(existing_pipelines,pipeline_name))
    else :
        pass

if existing_pipelines ==0: # Only create the pipeline one time
    pipeline = Pipeline(
        name = pipeline_name,
        parameters = [
            input_data,
            processing_instance_count,
            processing_instance_type,
            max_seq_length,
            balance_dataset,
            train_split_percentage,
            validation_split_percentage,
            test_split_percentage,
            feature_store_offline_prefix,
            feature_group_name,
            train_instance_type,
            train_instance_count,
            epochs,
            learning_rate,
            epsilon,
            train_batch_size,
            validation_batch_size,
            test_batch_size,
            train_steps_per_epoch,
            validation_steps,
            test_steps,
            train_volume_size,
            use_xla,
            use_map,
            freeze_bert_layer,
            enable_sagemaker_debugger,
            enable_checkpointing,
            enable_tensorboard,
            input_mode,
            run_validation,
            run_test,
            run_sample_predictions,
            min_accuracy_value,
            model_approval_status,
            deploy_instance_type,
            deploy_instance_count, 
        ],
        steps=[processing_step,training_step,evaluation_step,minimum_accuracy_condition_step],
        sagemaker_session=sess,
    )
    pipeline.create(role_arn=role)["PipelineArn"]
    print("Created pipeline with name {}".format(pipeline_name))
else :
    print("Error")
        

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


ClientError: An error occurred (ValidationException) when calling the CreatePipeline operation: Unable to parse pipeline definition. Expecting start of Json Array.

In [None]:
execution.list_steps()