# SageMaker Pipeline to train and deploy BERT


**BERT Pipeline**

The pipeline contains pre-processing, training, evaluation, and model registration.


## Install the required modules.

In [3]:
!pip install --disable-pip-version-check -q sagemaker==2.35.0

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
sparkmagic 0.20.4 requires nest-asyncio==1.5.5, but you have nest-asyncio 1.5.6 which is incompatible.[0m[31m
[0m

In [4]:
import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd
import json
import botocore
from botocore.exceptions import ClientError

config = botocore.config.Config(user_agent_extra='dlai-pds/c2/w3')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

## Setup the pipeline name

In [5]:
import time
timestamp = int(time.time())

pipeline_name = 'BERT-pipeline-{}'.format(timestamp)

<a name='c2w3-1.'></a>
# 1. Configure the dataset and processing step

<a name='c2w3-1.1.'></a>
### 1.1. Configure S3 path for raw input data

The raw dataset is in the public S3 bucket.

In [7]:
raw_input_data_s3_uri = 's3://dlai-practical-data-science/data/raw/'
print(raw_input_data_s3_uri)

s3://dlai-practical-data-science/data/raw/


In [8]:
!aws s3 ls $raw_input_data_s3_uri

2021-04-30 02:21:06    8457214 womens_clothing_ecommerce_reviews.csv


<a name='c2w3-1.2.'></a>
### 1.2. Configure processing step


Create workflow parameters of a specific type: integer, string, or float.

In [6]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

Set the parameters for the processing step.

In [9]:
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge"
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

balance_dataset = ParameterString(
    name="BalanceDataset",
    default_value="True",
)

max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=128,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="reviews-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(
    name="FeatureGroupName",
    default_value="reviews-feature-group-" + str(timestamp)
)

input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

Set up scikit-learn-based processor, pass the SageMaker execution role, process instance type and instance count.

In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},                             
)

use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. 

In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs=[
    ProcessingInput(
        input_name='raw-input-data',
        source=input_data,
        destination='/opt/ml/processing/input/data/',
        s3_data_distribution_type='ShardedByS3Key'
    )
]

processing_outputs=[
    ProcessingOutput(output_name='sentiment-train',
                     source='/opt/ml/processing/output/sentiment/train',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-validation',
                     source='/opt/ml/processing/output/sentiment/validation',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-test',
                     source='/opt/ml/processing/output/sentiment/test',
                     s3_upload_mode='EndOfJob')
]        

processing_step = ProcessingStep(
    name='Processing', 
    code='src/prepare_data.py',
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=['--train-split-percentage', str(train_split_percentage.default_value),                   
                   '--validation-split-percentage', str(validation_split_percentage.default_value),
                   '--test-split-percentage', str(test_split_percentage.default_value),
                   '--balance-dataset', str(balance_dataset.default_value),
                   '--max-seq-length', str(max_seq_length.default_value),                   
                   '--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
                   '--feature-group-name', str(feature_group_name.default_value)
                  ]
)        

print(processing_step)

ProcessingStep(name='Processing', step_type=<StepTypeEnum.PROCESSING: 'Processing'>)


## print out the list of the processing job properties

In [12]:
print(json.dumps(
    processing_step.properties.__dict__,
    indent=4, sort_keys=True, default=str
))

{
    "AppSpecification": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2050>",
    "AutoMLJobArn": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c26d0>",
    "CreationTime": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2650>",
    "Environment": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2150>",
    "ExitMessage": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2510>",
    "ExperimentConfig": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2350>",
    "FailureReason": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2550>",
    "LastModifiedTime": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2610>",
    "MonitoringScheduleArn": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2690>",
    "NetworkConfig": "<sagemaker.workflow.properties.Properties object at 0x7fc3099c2190>",
    "ProcessingEndTime": "<sagemaker.workflow.properties.Properties

Pull the channel sentiment-train from the output configuration of the processing job

In [13]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].__dict__, 
    indent=4, sort_keys=True, default=str
))

{
    "AppManaged": "<sagemaker.workflow.properties.Properties object at 0x7fc3099ebc90>",
    "FeatureStoreOutput": "<sagemaker.workflow.properties.Properties object at 0x7fc3099ebcd0>",
    "OutputName": "<sagemaker.workflow.properties.Properties object at 0x7fc3099ebe90>",
    "S3Output": "<sagemaker.workflow.properties.Properties object at 0x7fc3099ebf90>",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train']",
    "_shape_name": "ProcessingOutput"
}


pull and print out attributes of the S3 output path related to the sentiment-train output channel

In [14]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri.__dict__,
    indent=4, sort_keys=True, default=str
))

{
    "__str__": "S3Uri",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri",
    "_shape_name": "S3Uri"
}


Pull and print out attributes of the S3 output path object related to the sentiment-test output channel.

In [15]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri.__dict__, 
    indent=4, sort_keys=True, default=str
))

{
    "__str__": "S3Uri",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri",
    "_shape_name": "S3Uri"
}


Review the keys of this dictionary

In [16]:
processing_step.arguments.keys()

dict_keys(['ProcessingResources', 'AppSpecification', 'RoleArn', 'ProcessingInputs', 'ProcessingOutputConfig', 'Environment'])

Pull and review processing inputs from the arguments of the processing step

In [17]:
processing_step.arguments['ProcessingInputs']

[{'InputName': 'raw-input-data',
  'AppManaged': False,
  'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://dlai-practical-data-science/data/raw/'),
   'LocalPath': '/opt/ml/processing/input/data/',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'ShardedByS3Key',
   'S3CompressionType': 'None'}},
 {'InputName': 'code',
  'AppManaged': False,
  'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-357362156822/sagemaker-scikit-learn-2023-07-13-16-13-27-698/input/code/prepare_data.py',
   'LocalPath': '/opt/ml/processing/input/code',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'FullyReplicated',
   'S3CompressionType': 'None'}}]


Pull and review configuration of the processing outputs from the arguments of the processing step.

In [18]:
processing_step.arguments['ProcessingOutputConfig'] 

{'Outputs': [{'OutputName': 'sentiment-train',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-357362156822/sagemaker-scikit-learn-2023-07-13-16-12-56-082/output/sentiment-train',
    'LocalPath': '/opt/ml/processing/output/sentiment/train',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-validation',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-357362156822/sagemaker-scikit-learn-2023-07-13-16-12-56-082/output/sentiment-validation',
    'LocalPath': '/opt/ml/processing/output/sentiment/validation',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-test',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-357362156822/sagemaker-scikit-learn-2023-07-13-16-12-56-082/output/sentiment-test',
    'LocalPath': '/opt/ml/processing/output/sentiment/test',
    'S3UploadMode': 'EndOfJob'}}]}

<a name='c2w3-2.'></a>
# 2. Configure training step

<a name='c2w3-2.1.'></a>
### 2.1. Define parameters

Setup the parameters for the workflow. 

In [19]:
freeze_bert_layer = ParameterString(
    name="FreezeBertLayer",
    default_value="False",
)

epochs = ParameterInteger(
    name="Epochs",
    default_value=3
)
    
learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=0.00001
) 
    
train_batch_size = ParameterInteger(
    name="TrainBatchSize",
    default_value=64
)

train_steps_per_epoch = ParameterInteger(
    name="TrainStepsPerEpoch",
    default_value=50
)

validation_batch_size = ParameterInteger(
    name="ValidationBatchSize",
    default_value=64
)

validation_steps_per_epoch = ParameterInteger(
    name="ValidationStepsPerEpoch",
    default_value=50
)

seed = ParameterInteger(
    name="Seed",
    default_value=42
)

run_validation = ParameterString(
    name="RunValidation",
    default_value="True",
)

train_instance_count = ParameterInteger(
    name="TrainInstanceCount",
    default_value=1
)

train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.c5.9xlarge"
)

train_volume_size = ParameterInteger(
    name="TrainVolumeSize",
    default_value=256
) 

input_mode = ParameterString(
    name="InputMode",
    default_value="File",
)

<a name='c2w3-2.2.'></a>
### 2.2. Configure hyper-parameters

Setup the hyperparameters dictionary 

In [20]:
hyperparameters={
    'max_seq_length': max_seq_length,
    'freeze_bert_layer': freeze_bert_layer,
    'epochs': epochs,
    'learning_rate': learning_rate,
    'train_batch_size': train_batch_size,
    'train_steps_per_epoch': train_steps_per_epoch,
    'validation_batch_size': validation_batch_size,
    'validation_steps_per_epoch': validation_steps_per_epoch,
    'seed': seed,
    'run_validation': run_validation
}

<a name='c2w3-2.3.'></a>
### 2.3. Configure model-evaluation metrics

Choose loss and accuracy as the evaluation metrics.

In [21]:
metric_definitions = [
     {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9.]+)'},
     {'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]

### 2.4. Configure the PyTorchEstimator



In [22]:
from sagemaker.pytorch import PyTorch as PyTorchEstimator

estimator = PyTorchEstimator(
    entry_point='train.py',
    source_dir='src',
    role=role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version='py3',
    framework_version='1.6.0',
    hyperparameters=hyperparameters,
    metric_definitions=metric_definitions,
    input_mode=input_mode
)

### 2.5. Setup pipeline step caching


In [23]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H: one hour


### 2.6. Configure the TrainingStep

In [24]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='Train',
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-train'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-validation'
            ].S3Output.S3Uri,
            content_type='text/csv'
        )
    },
    cache_config=cache_config
)

print(training_step)

TrainingStep(name='Train', step_type=<StepTypeEnum.TRAINING: 'Training'>)


print out attributes of the training step properties.

In [26]:
training_step.properties.__dict__ 

{'_path': 'Steps.Train',
 '_shape_name': 'DescribeTrainingJobResponse',
 'TrainingJobName': <sagemaker.workflow.properties.Properties at 0x7fc309f3a610>,
 'TrainingJobArn': <sagemaker.workflow.properties.Properties at 0x7fc309f3a5d0>,
 'TuningJobArn': <sagemaker.workflow.properties.Properties at 0x7fc309f3a710>,
 'LabelingJobArn': <sagemaker.workflow.properties.Properties at 0x7fc309f3a6d0>,
 'AutoMLJobArn': <sagemaker.workflow.properties.Properties at 0x7fc309f3a450>,
 'ModelArtifacts': <sagemaker.workflow.properties.Properties at 0x7fc309f3a410>,
 'TrainingJobStatus': <sagemaker.workflow.properties.Properties at 0x7fc309f3a4d0>,
 'SecondaryStatus': <sagemaker.workflow.properties.Properties at 0x7fc309f3a2d0>,
 'FailureReason': <sagemaker.workflow.properties.Properties at 0x7fc309f3a250>,
 'HyperParameters': <sagemaker.workflow.properties.Properties at 0x7fc309f3a390>,
 'AlgorithmSpecification': <sagemaker.workflow.properties.Properties at 0x7fc309f3a350>,
 'RoleArn': <sagemaker.workf

# 3. Configure model-evaluation step


Create an instance of the SKLearnProcessor to run evaluation script as a scikit-learn-based SageMaker processing job.

In [27]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},
    max_runtime_in_seconds=7200
)

Setup the output PropertyFile

In [28]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='metrics',
    path='evaluation.json'
)

Use the processor instance to construct a ProcessingStep, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. 

In [29]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    code='src/evaluate_model_metrics.py',
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri,
            destination='/opt/ml/processing/input/data'
        )
    ],
    outputs=[
        ProcessingOutput(output_name='metrics', 
                         s3_upload_mode='EndOfJob',
                         source='/opt/ml/processing/output/metrics/'),
    ],
    job_arguments=[
        '--max-seq-length', str(max_seq_length.default_value),
    ],
    property_files=[evaluation_report],
)

<a name='c2w3-4.'></a>
# 4. Configure and register model step

### 4.1. Configure the model for deployment


In [30]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

deploy_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.m5.large"
)

deploy_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=1
)

In [31]:
model_package_group_name = f"BERT-Reviews-{timestamp}"

print(model_package_group_name)

BERT-Reviews-1689263605


Configure the ModelMetrics to be stored as metadata.

In [32]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

print(model_metrics)

<sagemaker.model_metrics.ModelMetrics object at 0x7fc3095cd410>


Define deployment image for inference.

In [33]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.6.0",
    py_version="py36",
    instance_type=deploy_instance_type,
    image_scope="inference"
)
print(inference_image_uri)

763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36



### 4.2. Register the model for deployment


Configure the register model step.


In [34]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    image_uri=inference_image_uri, 
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type], # batch transform is not used 
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)


# 5. Create model for deployment step

### Exercise 5

Configure model for deployment.


In [35]:
from sagemaker.model import Model

model_name = 'bert-model-{}'.format(timestamp)

model = Model(
    name=model_name,
    image_uri=inference_image_uri, 
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

configure create model input:

In [36]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type, 
)



Configure create model step for the workflow.


In [37]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model, 
    inputs=create_inputs, 
)

# 6. Check accuracy condition step

only register this model if the accuracy of the model, as determined by evaluation step, exceeded some value. 

* define a minimum accuracy value as a parameter
* define a ConditionGreaterThan on the accuracy value found in the output of the evaluation step, evaluation_step.
* use the condition in the list of conditions in a ConditionStep
* pass the RegisterModel step collection into the if_steps of the ConditionStep

In [38]:
min_accuracy_value = ParameterFloat(
    name="MinAccuracyValue",
    default_value=0.33 # random choice from three classes
)

In [39]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value # minimum accuracy threshold
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step], # successfully exceeded or equaled the minimum accuracy, continue with model registration
    else_steps=[], # did not exceed the minimum accuracy, the model will not be registered
)


# 7. Create pipeline


### 7.1. Define a pipeline of parameters, steps, and conditions


In [40]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        max_seq_length,
        balance_dataset,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage,
        feature_store_offline_prefix,
        feature_group_name,
        epochs,
        learning_rate,
        train_batch_size,
        train_steps_per_epoch,
        validation_batch_size,
        validation_steps_per_epoch,
        freeze_bert_layer,
        seed,
        train_instance_count,
        train_instance_type,
        train_volume_size,        
        input_mode,
        run_validation,
        min_accuracy_value,
        model_approval_status,
        deploy_instance_type,
        deploy_instance_count
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sess,
)

examine the JSON of the pipeline definition that meets the SageMaker Workflow Pipeline DSL specification.


In [41]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Metadata': {},
 'Parameters': [{'DefaultValue': 's3://dlai-practical-data-science/data/raw/',
                 'Name': 'InputData',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'ProcessingInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.c5.2xlarge',
                 'Name': 'ProcessingInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 128,
                 'Name': 'MaxSeqLength',
                 'Type': 'Integer'},
                {'DefaultValue': 'True',
                 'Name': 'BalanceDataset',
                 'Type': 'String'},
                {'DefaultValue': 0.9,
                 'Name': 'TrainSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'ValidationSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'TestSplitPercenta


Create pipeline using the create method and  print the Amazon Resource Name (ARN) of it.

In [42]:
response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


arn:aws:sagemaker:us-east-1:357362156822:pipeline/BERT-pipeline-1689263605


### 7.2. Start Pipeline
submit  pipeline definition to the Amazon SageMaker Pipeline service. 

In [43]:
execution = pipeline.start(
    parameters=dict(
        InputData=raw_input_data_s3_uri,
        ProcessingInstanceCount=1,
        ProcessingInstanceType='ml.c5.2xlarge',
        MaxSeqLength=128,
        BalanceDataset='True',
        TrainSplitPercentage=0.9,
        ValidationSplitPercentage=0.05,
        TestSplitPercentage=0.05,
        FeatureStoreOfflinePrefix='reviews-feature-store-'+str(timestamp),
        FeatureGroupName='reviews-feature-group-'+str(timestamp),
        Epochs=3,
        LearningRate=0.000012,
        TrainBatchSize=64,
        TrainStepsPerEpoch=50,
        ValidationBatchSize=64,
        ValidationStepsPerEpoch=64,
        FreezeBertLayer='False',
        Seed=42,         
        TrainInstanceCount=1,
        TrainInstanceType='ml.c5.9xlarge',
        TrainVolumeSize=256,
        InputMode='File',
        RunValidation='True',
        MinAccuracyValue=0.01,
        ModelApprovalStatus='PendingManualApproval', 
        DeployInstanceType='ml.m5.large',
        DeployInstanceCount=1 
    )
)

print(execution.arn)

arn:aws:sagemaker:us-east-1:357362156822:pipeline/BERT-pipeline-1689263605/execution/jmbm64xi30lz



### 7.3. pipeline execution

describe execution instance and list the steps in the execution

In [44]:
from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)

{'CreatedBy': {'DomainId': 'd-zp6p7kqxczim',
               'UserProfileArn': 'arn:aws:sagemaker:us-east-1:357362156822:user-profile/d-zp6p7kqxczim/sagemaker-user-profile-us-east-1',
               'UserProfileName': 'sagemaker-user-profile-us-east-1'},
 'CreationTime': datetime.datetime(2023, 7, 13, 16, 32, 58, 37000, tzinfo=tzlocal()),
 'LastModifiedBy': {'DomainId': 'd-zp6p7kqxczim',
                    'UserProfileArn': 'arn:aws:sagemaker:us-east-1:357362156822:user-profile/d-zp6p7kqxczim/sagemaker-user-profile-us-east-1',
                    'UserProfileName': 'sagemaker-user-profile-us-east-1'},
 'LastModifiedTime': datetime.datetime(2023, 7, 13, 16, 32, 58, 37000, tzinfo=tzlocal()),
 'PipelineArn': 'arn:aws:sagemaker:us-east-1:357362156822:pipeline/BERT-pipeline-1689263605',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:357362156822:pipeline/BERT-pipeline-1689263605/execution/jmbm64xi30lz',
 'PipelineExecutionDisplayName': 'execution-1689265978166',
 'PipelineExecutionSt

Print the execution display name and its ARN

In [45]:
execution_run_name = execution_run['PipelineExecutionDisplayName']
print(execution_run_name)

execution-1689265978166


In [46]:
pipeline_execution_arn = execution_run['PipelineExecutionArn']
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:357362156822:pipeline/BERT-pipeline-1689263605/execution/jmbm64xi30lz


<a name='c2w3-7.4.'></a>
### 7.4. Describe completed pipeline

In [47]:
import time

time.sleep(30)

execution.list_steps()

[{'StepName': 'Processing',
  'StartTime': datetime.datetime(2023, 7, 13, 16, 32, 58, 760000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:357362156822:processing-job/pipelines-jmbm64xi30lz-Processing-4AyuzlGhnx'}}}]

### 7.5. Wait for the pipeline to complete



observe the pipeline execution summary and waiting for the execution 


In [48]:
%%time

import time
from pprint import pprint

sm = boto3.Session().client(service_name='sagemaker', region_name=region)

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)

while pipeline_execution_status=='Executing':
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
        pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
    except Exception as e:
        print('Please wait...')
        time.sleep(30)    
    
pprint(executions_response)

Executing
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:357362156822:pipeline/BERT-pipeline-1689263605/execution/jmbm64xi30lz',
  'PipelineExecutionDisplayName': 'execution-1689265978166',
  'PipelineExecutionStatus': 'Succeeded',
  'StartTime': datetime.datetime(2023, 7, 13, 16, 32, 58, 37000, tzinfo=tzlocal())}]
CPU times: user 27.9 s, sys: 1.39 s, total: 29.3 s
Wall time: 31min 57s


list the execution steps to check out the status and artifacts

In [49]:
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)

Succeeded


In [50]:
pipeline_execution_arn = executions_response[0]['PipelineExecutionArn']
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:357362156822:pipeline/BERT-pipeline-1689263605/execution/jmbm64xi30lz



# 8. Evaluate the model


### 8.1. Describe evaluation metrics
Download the resulting evaluation.json file from S3 and print the report.

In [51]:
processing_job_name = None

# pull the processing step name
for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

# get the description of the processing job
describe_transform_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

# get the output S3 path
transform_output_s3_uri = describe_transform_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Transform output {}'.format(transform_output_s3_uri))

Transform output s3://sagemaker-us-east-1-357362156822/sagemaker-scikit-learn-2023-07-13-16-12-56-082/output/sentiment-train


In [52]:
# list the files in the resulting output S3 path
!aws s3 ls --recursive $transform_output_s3_uri

2023-07-13 16:44:13    4910516 sagemaker-scikit-learn-2023-07-13-16-12-56-082/output/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv



Pull the name of the model-evaluation step and then get the S3 path of the evaluation metrics


In [53]:
processing_job_name = None

for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'EvaluateModel': 
         processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

describe_evaluation_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

evaluation_metrics_s3_uri = describe_evaluation_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Evaluation output {}'.format(evaluation_metrics_s3_uri))

Evaluation output s3://sagemaker-us-east-1-357362156822/sagemaker-scikit-learn-2023-07-13-16-28-08-354/output/metrics



### 8.2. Review the evaluation report

Download the evaluation report and print the accuracy.

In [54]:
from pprint import pprint

evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    evaluation_metrics_s3_uri
))

pprint(json.loads(evaluation_json))

{'metrics': {'accuracy': {'value': 0.6957928802588996}}}


<a name='c2w3-8.3.'></a>
### 8.3. List pipeline artifacts

print the ARN and job name of the training job. 


In [55]:
training_job_arn=None

for execution_step in execution.list_steps():
    if execution_step['StepName'] == 'Train':
        training_job_arn = execution_step['Metadata']['TrainingJob']['Arn']        
        pprint(execution_step)
        break
print('Training job ARN: {}'.format(training_job_arn))
        
training_job_name = training_job_arn.split('/')[-1]
print('Training job Name: {}'.format(training_job_name))

{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 7, 13, 16, 58, 57, 924000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:357362156822:training-job/pipelines-jmbm64xi30lz-Train-2vb6sJTDAc'}},
 'StartTime': datetime.datetime(2023, 7, 13, 16, 44, 21, 280000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job ARN: arn:aws:sagemaker:us-east-1:357362156822:training-job/pipelines-jmbm64xi30lz-Train-2vb6sJTDAc
Training job Name: pipelines-jmbm64xi30lz-Train-2vb6sJTDAc


print the pipeline artifacts.

In [56]:
processing_job_name=None
training_job_name=None

In [57]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    pprint(execution_step)
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
        print('Processing job name: {}'.format(processing_job_name))
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step['StepName'] == 'Train':
        training_job_name=execution_step['Metadata']['TrainingJob']['Arn'].split('/')[-1]
        print('Training job name: {}'.format(training_job_name))
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 7, 13, 16, 44, 20, 285000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:357362156822:processing-job/pipelines-jmbm64xi30lz-Processing-4AyuzlGhnx'}},
 'StartTime': datetime.datetime(2023, 7, 13, 16, 32, 58, 760000, tzinfo=tzlocal()),
 'StepName': 'Processing',
 'StepStatus': 'Succeeded'}
Processing job name: pipelines-jmbm64xi30lz-Processing-4AyuzlGhnx


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...-16-32-36-034/input/code/prepare_data.py,Input,DataSet,ContributedTo,artifact
1,s3://dlai-practical-data-science/data/raw/,Input,DataSet,ContributedTo,artifact
2,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...07-13-16-12-56-082/output/sentiment-test,Output,DataSet,Produced,artifact
4,s3://...16-12-56-082/output/sentiment-validation,Output,DataSet,Produced,artifact
5,s3://...7-13-16-12-56-082/output/sentiment-train,Output,DataSet,Produced,artifact


{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 7, 13, 16, 58, 57, 924000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:357362156822:training-job/pipelines-jmbm64xi30lz-Train-2vb6sJTDAc'}},
 'StartTime': datetime.datetime(2023, 7, 13, 16, 44, 21, 280000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job name: pipelines-jmbm64xi30lz-Train-2vb6sJTDAc


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...16-12-56-082/output/sentiment-validation,Input,DataSet,ContributedTo,artifact
1,s3://...7-13-16-12-56-082/output/sentiment-train,Input,DataSet,ContributedTo,artifact
2,76310...onaws.com/pytorch-training:1.6.0-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...0lz-Train-2vb6sJTDAc/output/model.tar.gz,Output,Model,Produced,artifact


{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 7, 13, 17, 5, 23, 604000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:357362156822:processing-job/pipelines-jmbm64xi30lz-EvaluateModel-nabzeJuM0c'}},
 'StartTime': datetime.datetime(2023, 7, 13, 16, 59, 0, 280000, tzinfo=tzlocal()),
 'StepName': 'EvaluateModel',
 'StepStatus': 'Succeeded'}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...311/input/code/evaluate_model_metrics.py,Input,DataSet,ContributedTo,artifact
1,s3://...07-13-16-12-56-082/output/sentiment-test,Input,DataSet,ContributedTo,artifact
2,s3://...0lz-Train-2vb6sJTDAc/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...n-2023-07-13-16-28-08-354/output/metrics,Output,DataSet,Produced,artifact


{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 7, 13, 17, 5, 24, 490000, tzinfo=tzlocal()),
 'Metadata': {'Condition': {'Outcome': 'True'}},
 'StartTime': datetime.datetime(2023, 7, 13, 17, 5, 24, 103000, tzinfo=tzlocal()),
 'StepName': 'AccuracyCondition',
 'StepStatus': 'Succeeded'}


None

{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 7, 13, 17, 5, 26, 747000, tzinfo=tzlocal()),
 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:357362156822:model/pipelines-jmbm64xi30lz-createmodel-fttmuoojxa'}},
 'StartTime': datetime.datetime(2023, 7, 13, 17, 5, 25, 64000, tzinfo=tzlocal()),
 'StepName': 'CreateModel',
 'StepStatus': 'Succeeded'}


None

{'AttemptCount': 0,
 'EndTime': datetime.datetime(2023, 7, 13, 17, 5, 26, 446000, tzinfo=tzlocal()),
 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:357362156822:model-package/bert-reviews-1689263605/1'}},
 'StartTime': datetime.datetime(2023, 7, 13, 17, 5, 25, 64000, tzinfo=tzlocal()),
 'StepName': 'RegisterModel',
 'StepStatus': 'Succeeded'}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...0lz-Train-2vb6sJTDAc/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,76310...aws.com/pytorch-inference:1.6.0-cpu-py36,Input,Image,ContributedTo,artifact
2,bert-reviews-1689263605-1-PendingManualApprova...,Input,Approval,ContributedTo,action
3,BERT-Reviews-1689263605-1689267925-aws-model-p...,Output,ModelGroup,AssociatedWith,context


<a name='c2w3-9.'></a>
# 9. Deploy and test the model

### 9.1. Approve trained model


Get the model package ARN.

In [58]:
for execution_step in execution.list_steps():
    if execution_step['StepName'] == 'RegisterModel':
        model_package_arn = execution_step['Metadata']['RegisterModel']['Arn']
        break
print(model_package_arn)

arn:aws:sagemaker:us-east-1:357362156822:model-package/bert-reviews-1689263605/1


Update the model package with the Approved status to prepare for deployment.

The model must be Approved before it can be deployed.

In [59]:
model_package_update_response = sm.update_model_package(
    ModelPackageArn=model_package_arn,
    ModelApprovalStatus="Approved",
)

pprint(model_package_update_response)

{'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:357362156822:model-package/bert-reviews-1689263605/1',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '102',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Thu, 13 Jul 2023 17:09:41 GMT',
                                      'x-amzn-requestid': '1605eb12-dcbf-4214-b14b-458820fe14b4'},
                      'HTTPStatusCode': 200,
                      'RequestId': '1605eb12-dcbf-4214-b14b-458820fe14b4',
                      'RetryAttempts': 0}}


<a name='c2w3-9.2.'></a>
### 9.2. Deploy model

Get the model ARN and the model name from it.

In [60]:
for execution_step in execution.list_steps():
    print(execution_step['StepName'])
    if execution_step['StepName'] == 'CreateModel':
        model_arn = execution_step['Metadata']['Model']['Arn']
        break
print(model_arn)

model_name = model_arn.split('/')[-1]
print(model_name)

RegisterModel
CreateModel
arn:aws:sagemaker:us-east-1:357362156822:model/pipelines-jmbm64xi30lz-createmodel-fttmuoojxa
pipelines-jmbm64xi30lz-createmodel-fttmuoojxa


<a name='c2w3-9.3.'></a>
### 9.3. Create endpoint from registry

Configure the endpoint.

In [61]:
endpoint_config_name = 'bert-model-epc-{}'.format(timestamp)
print(endpoint_config_name)

create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m5.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName': model_name,
        'VariantName':'AllTraffic'}])

bert-model-epc-1689263605


Create the endpoint.

In [62]:
pipeline_endpoint_name = 'bert-model-ep-{}'.format(timestamp)
print("EndpointName={}".format(pipeline_endpoint_name))

create_endpoint_response = sm.create_endpoint(
    EndpointName=pipeline_endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

EndpointName=bert-model-ep-1689263605
arn:aws:sagemaker:us-east-1:357362156822:endpoint/bert-model-ep-1689263605


In [63]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pipeline_endpoint_name)))

In [64]:
%%time

while True:
    try: 
        waiter = sm.get_waiter('endpoint_in_service')
        print('Waiting for endpoint to be in `InService`...')
        waiter.wait(EndpointName=pipeline_endpoint_name)
        break;
    except:
        print('Waiting for endpoint...')
        endpoint_status = sm.describe_endpoint(EndpointName=pipeline_endpoint_name)['EndpointStatus']
        print('Endpoint status: {}'.format(endpoint_status))
        if endpoint_status == 'Failed':
            break
        time.sleep(30)
        
print('Endpoint deployed.')

Waiting for endpoint to be in `InService`...
Endpoint deployed.
CPU times: user 48.7 ms, sys: 23.1 ms, total: 71.8 ms
Wall time: 5min 1s


<a name='c2w3-9.4.'></a>
### 9.4. Test model

Predict the sentiment with review_body samples and review the result:

In [65]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

inputs = [
    {"features": ["I love this product!"]},
    {"features": ["OK, but not great."]},
    {"features": ["This is not the right product."]},
]

predictor = Predictor(
    endpoint_name=pipeline_endpoint_name,
    serializer=JSONLinesSerializer(),
    deserializer=JSONLinesDeserializer(),
    sagemaker_session=sess
)

predicted_classes = predictor.predict(inputs)

for predicted_class in predicted_classes:
    print("Predicted class {} with probability {}".format(predicted_class['predicted_label'], predicted_class['probability']))

Predicted class 1 with probability 0.9020829200744629
Predicted class 0 with probability 0.5049399733543396
Predicted class -1 with probability 0.7469818592071533
