# Build a SageMaker Pipeline to train and register the injury narrative BERT classifier

In [2]:
%%capture
!pip install tensorflow
!pip install transformers
!pip install nltk
!pip install -U sagemaker

In [3]:
import pandas as pd
import tensorflow as tf
import re
import nltk
import string
from nltk import word_tokenize
from sklearn.model_selection import train_test_split
from tensorflow.keras import activations, optimizers, losses
from sklearn.feature_extraction.text import TfidfVectorizer,CountVectorizer
import numpy as np
import sagemaker
from sagemaker import get_execution_role
import joblib 
import collections

In [4]:
print(tf.__version__)

2.6.0


In [5]:

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
bucket = default_bucket
bucket

'sagemaker-us-east-1-979294212144'

In [6]:
role

'arn:aws:iam::979294212144:role/service-role/AmazonSageMaker-ExecutionRole-20210423T122185'

In [7]:
#%%time
#%run ./src/pre-processing.py --data_path ./data/raw  --train_percentage 0.05 --is_sample_dataset

In [8]:
#%run ./src/evaluate_model_metrics.py --input_data './data/test' \
#            --input_model './output/model/training-BaseBERT-08-02-58-54-2021-08-02-19-58-55-017' \
#            --max_len 45 \
#            --output_data './output/model'

In [9]:
num_records = 6901
num_valid_records = 767
max_len = 45
epochs = 5
batch_size = 16
valid_batch_size = 16
steps_per_epoch = num_records // batch_size
validation_steps = num_valid_records // valid_batch_size
learning_rate = 5e-5
optimizer = 'adam'

In [10]:
print(num_records)
print(steps_per_epoch)
print(validation_steps)

6901
431
47


In [11]:
#%%time
#%run ./src/train.py --train ./data/train --validation ./data/valid --epochs 5 --num_records 138549 --steps_per_epoch 8659 --validation_steps 962

### pipeline name

In [12]:
import time
timestamp = int(time.time())

pipeline_name = 'BaseBERT-Injury-Coding-pipeline-{}'.format(timestamp)

## Step 1 - Dataset and preprocessing step

## Upload raw dataset

In [13]:
prefix = 'injury-data/raw'
input_data_train = sagemaker_session.upload_data(path = './data/raw',
                                                      bucket = bucket,
                                                      key_prefix = prefix)
input_data_train

's3://sagemaker-us-east-1-979294212144/injury-data/raw'

## Configure pre-processing step

In [14]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

In [15]:
# 7 params
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge"
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

train_percentage = ParameterFloat(
    name="TrainPercentage",
    default_value=0.05
)

is_sample_dataset = ParameterString(
    name="SampleDataset",
    default_value="True"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_train
)

transformer_model = ParameterString(
    name="TransformerModel",
    default_value='bert-base-uncased'
)
max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=45
)

In [16]:
region = sagemaker_session.boto_region_name

In [17]:
processing_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version="2.4.1",
    py_version="py37",
    instance_type=processing_instance_type,
    image_scope="training"
)
print(processing_image_uri)

763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-training:2.4.1-cpu-py37


In [18]:
from sagemaker.sklearn.processing import SKLearnProcessor,ScriptProcessor

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': sagemaker_session.boto_region_name},     
     max_runtime_in_seconds=18000
)

In [19]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


processing_inputs=[
    ProcessingInput(
        input_name='raw-input-data',
        source=input_data,
        destination='/opt/ml/processing/input/data/',
        s3_data_distribution_type='ShardedByS3Key'
    )
]

processing_outputs=[
    ProcessingOutput(output_name='processed-train',
                     source='/opt/ml/processing/output/processed/train',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='processed-validation',
                     source='/opt/ml/processing/output/processed/validation',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='processed-test',
                     source='/opt/ml/processing/output/processed/test',
                     s3_upload_mode='EndOfJob'),
]        

processing_step = ProcessingStep(
    name='Pre-Processing', 
    code='./src/pre-processing.py',
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=['--train_percentage', str(train_percentage.default_value),                   
                   '--max_len',str(max_seq_length.default_value),
                   '--transformer_model',str(transformer_model.default_value),
                   '--is_sample_dataset'
                  ]
)        

print(processing_step)

ProcessingStep(name='Pre-Processing', step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)


In [20]:
import json

# print out the list of the processing job properties
print(json.dumps(
    processing_step.properties.__dict__,
    indent=4, sort_keys=True, default=str
))

{
    "AppSpecification": "<sagemaker.workflow.properties.Properties object at 0x7f37e5d72150>",
    "AutoMLJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3c550>",
    "CreationTime": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3ca10>",
    "Environment": "<sagemaker.workflow.properties.Properties object at 0x7f37e5d722d0>",
    "ExitMessage": "<sagemaker.workflow.properties.Properties object at 0x7f37e7bca750>",
    "ExperimentConfig": "<sagemaker.workflow.properties.Properties object at 0x7f37e5d72510>",
    "FailureReason": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3cc50>",
    "LastModifiedTime": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3c610>",
    "MonitoringScheduleArn": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3c8d0>",
    "NetworkConfig": "<sagemaker.workflow.properties.Properties object at 0x7f37e5d723d0>",
    "ProcessingEndTime": "<sagemaker.workflow.properties.Properties

In [21]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['processed-train'].__dict__, 
    indent=4, sort_keys=True, default=str
))

{
    "AppManaged": "<sagemaker.workflow.properties.Properties object at 0x7f37e5d72710>",
    "FeatureStoreOutput": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3c950>",
    "OutputName": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3cc10>",
    "S3Output": "<sagemaker.workflow.properties.Properties object at 0x7f37e6b3c850>",
    "_path": "Steps.Pre-Processing.ProcessingOutputConfig.Outputs['processed-train']",
    "_shape_names": [
        "ProcessingOutput"
    ]
}


In [22]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['processed-train'].S3Output.S3Uri.__dict__,
    indent=4, sort_keys=True, default=str
))

{
    "__str__": "S3Uri",
    "_path": "Steps.Pre-Processing.ProcessingOutputConfig.Outputs['processed-train'].S3Output.S3Uri",
    "_shape_names": [
        "S3Uri"
    ]
}


## Step 3 - Training Step

In [23]:
# 12 params

epochs = ParameterInteger(
    name="Epochs",
    default_value=3
)

num_records = ParameterInteger(
    name="NumRecords",
    default_value = 6901
)
   

learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=5e-5
) 
    
train_batch_size = ParameterInteger(
    name="TrainBatchSize",
    default_value=16
)

train_steps_per_epoch = ParameterInteger(
    name="TrainStepsPerEpoch",
    default_value=431
)

validation_batch_size = ParameterInteger(
    name="ValidationBatchSize",
    default_value=16
)

validation_steps_per_epoch = ParameterInteger(
    name="ValidationStepsPerEpoch",
    default_value=47
)


train_instance_count = ParameterInteger(
    name="TrainInstanceCount",
    default_value=1
)

train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.p3.2xlarge"
)


max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=45
)

optimizer = ParameterString(
    name="optimizer",
    default_value='Adam'
)

input_mode = ParameterString(
    name="InputMode",
    default_value="File"
)

In [24]:
train_batch_size.default_value

16

In [25]:
hyperparameters={
    'max_seq_length': max_seq_length.default_value,
    'epochs': epochs.default_value,
    'num_records': num_records.default_value,
    'learning_rate': learning_rate.default_value,
    'batch_size': train_batch_size.default_value,
    'steps_per_epoch': train_steps_per_epoch.default_value,
    'validation_batch_size': validation_batch_size.default_value,
    'validation_steps': validation_steps_per_epoch.default_value,
    'optimizer': optimizer.default_value
}

In [26]:
metric_definitions = [{'Name':'train:loss','Regex':'loss: ([0-9\\.]+)'},
                                    {'Name':'train:accuracy','Regex':'acc: ([0-9\\.]+)'},
                                    {'Name':'validation:loss','Regex':'val_loss: ([0-9\\.]+)'},
                                    {'Name':'validation:accuracy','Regex':'val_acc: ([0-9\\.]+)'}]

In [27]:
from sagemaker.huggingface import HuggingFace

estimator = HuggingFace(
        entry_point="train.py",
        source_dir = "./src/",
        role=role,
        instance_count=train_instance_count.default_value,
        volume_size = 50,
        max_run = 18000,
        instance_type=train_instance_type.default_value,
        transformers_version = "4.4",
        tensorflow_version  = "2.4",
        py_version="py37",
        input_mode = input_mode.default_value,
        hyperparameters = hyperparameters,
        metric_definitions = metric_definitions,
        enable_sagemaker_metrics = True
    )



In [28]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour`

In [29]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='Train',
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'processed-train'
            ].S3Output.S3Uri
        ),
        'validation': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'processed-validation'
            ].S3Output.S3Uri
        )
    },
    cache_config=cache_config
)

print(training_step)

TrainingStep(name='Train', step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)


## Evaluation Step

In [30]:
from sagemaker.sklearn.processing import SKLearnProcessor,ScriptProcessor

evaluation_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},
    max_runtime_in_seconds=7200
)

In [31]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='metrics',
    path='evaluation.json'
)

In [32]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    code='./src/evaluate_model_metrics.py',
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['processed-test'].S3Output.S3Uri,
            destination='/opt/ml/processing/input/data'
        )
    ],
    outputs=[
        ProcessingOutput(output_name='metrics', 
                         s3_upload_mode='EndOfJob',
                         source='/opt/ml/processing/output/metrics/'),
    ],
    job_arguments=[
        '--max_len', str(max_seq_length.default_value)
    ],
    property_files=[evaluation_report],
)

## Register model step

In [33]:
# 3 parameters
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

deploy_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.m5.large"
)

deploy_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=1
)

In [34]:
model_package_group_name = f"BaseBERT-Injury-Coding-{timestamp}"

print(model_package_group_name)

BaseBERT-Injury-Coding-1630331318


In [35]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

print(model_metrics)

<sagemaker.model_metrics.ModelMetrics object at 0x7f37cf7b3f10>


Define deployment image for inference.

In [36]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version="2.4.1",
    py_version="py37",
    instance_type=deploy_instance_type,
    image_scope="inference"
)
print(inference_image_uri)

763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-inference:2.4.1-cpu


## Register model

In [37]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    image_uri=inference_image_uri, # Replace None 
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type], # batch transform is not used in this lab
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

## Create Model Step

In [38]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput

model_name = 'bert-model-{}'.format(timestamp)

model = Model(
    name=model_name,
    image_uri=inference_image_uri, # Replace None
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)
create_inputs = CreateModelInput(
    instance_type=deploy_instance_type, 
)

In [40]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model, # Replace None
    inputs=create_inputs, # Replace None
)

# 6. Check accuracy condition step

In [41]:
min_accuracy_value = ParameterFloat(
    name="MinAccuracyValue",
    default_value=0.80 
)

In [42]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value # minimum accuracy threshold
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step], # successfully exceeded or equaled the minimum accuracy, continue with model registration
    else_steps=[], # did not exceed the minimum accuracy, the model will not be registered
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


# 7. Create pipeline

In [43]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[ 
        input_data,        
        processing_instance_count, 
        processing_instance_type, 
        max_seq_length, 
        is_sample_dataset, 
        transformer_model, 
        train_percentage,   
        epochs, 
        num_records, 
        learning_rate, 
        optimizer, 
        train_batch_size, 
        train_steps_per_epoch,  
        validation_batch_size, 
        validation_steps_per_epoch, 
        input_mode, 
        train_instance_count, 
        train_instance_type,   
        min_accuracy_value, 
        model_approval_status, 
        deploy_instance_type, 
        deploy_instance_count 
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sagemaker_session,
)


In [44]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Metadata': {},
 'Parameters': [{'DefaultValue': 's3://sagemaker-us-east-1-979294212144/injury-data/raw',
                 'Name': 'InputData',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'ProcessingInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.c5.2xlarge',
                 'Name': 'ProcessingInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 45, 'Name': 'MaxSeqLength', 'Type': 'Integer'},
                {'DefaultValue': 'True',
                 'Name': 'SampleDataset',
                 'Type': 'String'},
                {'DefaultValue': 'bert-base-uncased',
                 'Name': 'TransformerModel',
                 'Type': 'String'},
                {'DefaultValue': 0.05,
                 'Name': 'TrainPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 3, 'Name': 'Epochs', 'Type': 'Integer'},
                {'DefaultValue

In [45]:
response = pipeline.create(role_arn=role)

#pipeline_arn = response["PipelineArn"]
#print(pipeline_arn)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


In [46]:
execution = pipeline.start(
    parameters=dict(
        InputData=input_data_train,
        ProcessingInstanceCount=1,
        ProcessingInstanceType='ml.c5.2xlarge',
        MaxSeqLength=45,
        SampleDataset='True',
        TransformerModel = 'bert-based-uncased',
        TrainPercentage=0.05,
        Epochs=1,
        NumRecords = 6901,
        LearningRate=5e-5,
        optimizer = 'Adam',
        TrainBatchSize=16,
        TrainStepsPerEpoch=431,
        ValidationBatchSize=16,
        ValidationStepsPerEpoch=47,
        InputMode= 'File',
        TrainInstanceCount=1,
        TrainInstanceType='ml.p3.2xlarge',
        MinAccuracyValue=0.75,
        ModelApprovalStatus='PendingManualApproval', 
        DeployInstanceType='ml.m5.large',
        DeployInstanceCount=1 
    )
)

print(execution.arn)

arn:aws:sagemaker:us-east-1:979294212144:pipeline/basebert-injury-coding-pipeline-1630331318/execution/sb6rr27jlfk4
