# MLOps - NLP Lab with Amazon SageMaker

**Step 5** - *Automate Inference with the AWS Step Functions Data Science SDK*

## Initialization
---
### Install Step Function Data Science SDK

In [None]:
!pip install --upgrade -q stepfunctions

### Setup environment

In [None]:
import os
import json
import uuid
import sagemaker
from sagemaker.utils import name_from_base
from sagemaker.pytorch import PyTorchModel
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
import stepfunctions
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow, cloudformation
from stepfunctions.steps import Chain, ProcessingStep, TransformStep, Catch, Fail, Succeed

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role() # execution role for SageMaker
workflow_execution_role = role # execution role for Step Functions
bucket = sagemaker_session.default_bucket() # you can specify a bucket name here

In [None]:
input_location_fname = '../1_prepare_data/processing_input_location.txt'
if os.path.exists(input_location_fname):
    with open(input_location_fname, 'r') as f:
        processing_input = f.readline()

    print(f'Processing input location  | {processing_input}')
    
else:
    print(f'Processing input location file not found ({input_location_fname}): check that the previous notebook was fully executed.')
    
ecr_image_fname = '../1_prepare_data/ecr_image_name.txt'
if os.path.exists(ecr_image_fname):
    with open(ecr_image_fname, 'r') as f:
        container = f.readline()[:-1]
        
    print(f'Processing ECR Image ID    | {container}')
    
else:
    print('ECR Image ID not found.')
    
model_artifact_fname = '../2_train_model/model_artifact_location.txt'
if os.path.exists(model_artifact_fname):
    with open(model_artifact_fname, 'r') as f:
        model_artefact = f.readline()

    print(f'Model artifact S3 location | {model_artefact}')
    
else:
    print(f'Model artifact location file not found ({model_artifact_fname}): check that the previous notebook was fully executed.')

## Defining workflow steps
---
### Execution input placeholders

In [None]:
job_name = uuid.uuid1().hex
processing_output= f's3://{bucket}/{job_name}/data/processed/'
transform_input = f's3://{bucket}/{job_name}/data/processed/test_batch_transform.csv'
transform_output = f's3://{bucket}/{job_name}/data/predicted/'

execution_input = ExecutionInput(
    schema={
        "JobName": str,
        "Processing": {
            "Input": str,
            "Output": str
        }
    }
)

### Create data processor

In [None]:
data_processor = Processor(
    role=role, 
    image_uri=container, 
    instance_count=1, 
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=30, 
    max_runtime_in_seconds=1200
)

In [None]:
input_folder = '/opt/ml/processing/input'
output_folder = '/opt/ml/processing/output'

inputs = [
    ProcessingInput(
        input_name='input',
        source=execution_input["Processing"]["Input"],
        destination=input_folder
    )
]

outputs = [
    ProcessingOutput(
        output_name='preprocessed',
        source=output_folder,
        destination=execution_input["Processing"]["Output"]
    )
]

### Create SageMaker model and data transformer

In [None]:
model = PyTorchModel(model_data=model_artefact,
                     name=name_from_base('bert-model'),
                     role=role, 
                     entry_point='predict_batch.py',
                     source_dir='source_dir',
                     framework_version='1.5.0')

transformer = model.transformer(
    instance_count=1, 
    instance_type='ml.m5.xlarge',
    strategy='SingleRecord',
    assemble_with='Line',
    accept = 'text/csv',
    max_concurrent_transforms=50,
    output_path=transform_output
)

## Assembling workflow steps
---

In [None]:
processing_step = ProcessingStep(
    state_id="Process Data",
    processor=data_processor,
    job_name=execution_input["JobName"],
    inputs=inputs,
    outputs=outputs,
    container_arguments=[f"--input={input_folder}", f"--output={output_folder}"],
    result_path="$.Processing"
)

In [None]:
transformer_step = TransformStep(
    state_id='Predict Batch',
    transformer=transformer,
    job_name=execution_input['JobName'],     
    model_name=model.name, 
    data=transform_input,
    content_type='text/csv',
    split_type='Line',
    join_source='Input',
    result_path="$.Inference"
)

In [None]:
# Error catching, failure and success steps:
failed = Fail(state_id="Failed")
succeed = Succeed(state_id="Succeed")
catch_failures = Catch(error_equals=["States.ALL"], next_step=failed)
processing_step.add_catch(catch_failures)
transformer_step.add_catch(catch_failures)

## Create workflow pipeline
---
By using the Chain utility, we can chain all the above steps together to occur sequentially. We can then choose to output the entire workflow as a JSON, that can be used in a much larger Cloud Formation template for example, which also includes information on the provisioning of instances, setting up of network security etc., or run on its own. By creating the workflow and rendering the graph, a state machine will be created in Amazon Step Functions console.

In [None]:
workflow_graph = Chain([
        processing_step,
        transformer_step,
        succeed
])

workflow_pipeline = Workflow(
    name="BatchWorkflow",
    definition=workflow_graph,
    execution_input=execution_input,
    role=workflow_execution_role
)

In [None]:
print(workflow_pipeline.definition.to_json(pretty=True))

In [None]:
workflow_pipeline.render_graph(portrait=True)

### Create/update state machine and execute

In [None]:
workflow_pipeline.create()
# workflow_pipeline.update(workflow_pipeline_definition)

## Workflow pipeline inputs
---
While the following cell is running, cruise over the **[Step Function console](https://eu-west-1.console.aws.amazon.com/states/home)** to check what we just created:

In [None]:
# Using the schema defined at the beginning of this notebook, 
# we instantiate the Step Function workflow execution input:
execution_inputs = {
    "JobName": job_name, 
    "Processing": {
        "Input": processing_input, 
        "Output": processing_output
    }
}

workflow_pipeline.execute(inputs=execution_inputs)

### Generate CloudFormation template

In [None]:
print(workflow_pipeline.get_cloudformation_template())