## Lab #2 - Build end-to-end ML workflow with AWS Step Functions Data Science SDK
### Use Case: Industrializing a sample XGBoost model in AWS - Courier default

We will now assume that our model development work is ready, and we will start preparing the workflow for automating the training (and re-training) of the model with production data, and setup the bases for hosting this model in production later.

-------

**Courier default description:**

*"We define courier default when a courier collects a high amount of cash and then leaves the company without returning the money.

- By leaving the company we define as not having any interaction with us during 28 consecutive days since the last delivered order.
- Each region/city/country has a threshold to designate a high amount of cash defined by the business.

The business request made to the Data Science team is to design a machine learning system to detect when a courier is likely to incur in default before they actually default or before the amount of cash balance is too high.

The proposed definition is to deliver a default score for each courier every week. For that we created a label for each courier on a window time, so we can identify some defaulters and then fit a xgboost classifier to predict on current couriers."*

-------

***Please make sure you follow the steps for creating the AWS IAM roles required before running this notebook. These are used in the Amazon SageMaker notebook for interacting with other services like AWS Step Functions and AWS Lambda.***

**Architecture:**

Note the Data Scientists will use this notebook with Python 3 and the AWS Step Functions Data Science SDK installed for creating the ML workflow. We will also use AWS Lambda for some handy functions in the workflow.

<img src="./sample_architecture.jpg" width="60%">

*For more information on how to create a notebook or how to install the Data Science SDK you can check the documentation* [here](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/readmelink.html#getting-started-with-sample-jupyter-notebooks).

In [None]:
# Install/upgrade the stepfunctions library...
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

----------------

### Preparation
Let us start by loading some libraries and setting up the roles on the notebook.

***--- NOTE: Make sure you replace with your bucket info and role ARNs in the placeholders below! ---***

In [None]:
import boto3, sagemaker, time, random, uuid, logging, stepfunctions, io, os, random

from sagemaker.amazon.amazon_estimator import get_image_uri
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep, TransformStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath

sagemaker_execution_role = sagemaker.get_execution_role()
session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

###################################################
###### REPLACE THE INFO IN THE PLACEHOLDERS #######
###################################################
# Replace with your AWS IAM ROLE ARN... in both places
workflow_execution_role = 'REPLACE WITH YOUR ROLE ARN HERE'
lambda_role = 'REPLACE WITH YOUR ROLE ARN HERE'
# Replace with your Amazon S3 bucket name, prefix, and dataset filename...
bucket = 'REPLACE WITH YOUR BUKCET NAME HERE'
prefix = 'glovo/workflow'
filename = 'joint_dataframe.csv'
###################################################

print('Using AWS StepFunctions and AWS Lambda roles:\n {}\n {}\n'.format(workflow_execution_role, lambda_role))

print('Uploading {} dataset to Amazon S3 at:\n https://s3.console.aws.amazon.com/s3/buckets/{}/{}/'.format(filename, bucket, prefix))
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, filename)).upload_file(filename)

--------

### Pre-processing


#### Automating data preparation with AWS Lambda

Prepare a script for running your sample feature engineering. We will use an AWS Lambda function for this because we want to automate as much as possible.

Note we could also rely on Amazon SageMaker Processing for performing this data pre-processing. We will use a Lambda function in this case for simplicity and time.

In [None]:
# Create data pre-processing AWS Lambda:

# First download Pandas and Numpy for using in our AWS Lambda package, as these do not come in AWS Lambda's base...
import os
from urllib.request import urlretrieve 
!mkdir lambda
urlretrieve("https://files.pythonhosted.org/packages/7b/fd/41698f20fd297cef2dc43a72a8ca42d149eaf7d954f1fb2bd3fc366a658d/pandas-0.25.3-cp38-cp38-manylinux1_x86_64.whl", "lambda/pandas-0.25.3-cp38-cp38-manylinux1_x86_64.whl")
urlretrieve("https://files.pythonhosted.org/packages/d7/6a/3fed132c846d1e47963f30376cc041e9dd586d286d931055ad06ff65c6c7/numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl", "lambda/numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl")
!unzip -o lambda/pandas-0.25.3-cp38-cp38-manylinux1_x86_64.whl -d lambda
!unzip -o lambda/numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl -d lambda

# then install the pytz dependency locally...
!pip install -t lambda pytz
# and remove the files no longer needed...
!rm -rf lambda/*.whl lambda/*.dist-info lambda/__pycache__

# prepare the Lambda function code for your pre-processing script...
file_name = 'lambda/lambda_function.py'
def MakeFile(file_name):
    with open(file_name, 'w') as f:
        f.write('''\
import json
import boto3
import pandas
import numpy as np
import os
bucket = os.environ['BUCKET']
prefix = os.environ['PREFIX']
filename = os.environ['FILENAME']
def lambda_handler(event, context):
    s3 = boto3.resource('s3')
    s3.Bucket(bucket).download_file(prefix + "/" + filename, '/tmp/' + filename)
    df = pandas.read_csv("/tmp/" + filename)
    cols = [0, 1, 2, 3, 4, 5, 6, 8, 56, 191, 201, 203, 206, 209, 218, 220]
    df.drop(df.columns[cols],axis=1,inplace=True)
    df = pandas.concat([df['is_defaulter'], df.drop(['is_defaulter'], axis=1)], axis=1)
    train_data, validation_data, test_data = np.split(df.sample(frac=1, random_state=1729), [int(0.7 * len(df)), int(0.9 * len(df))])
    train_data.to_csv('/tmp/train.csv', header=False, index=False)
    validation_data.to_csv('/tmp/validation.csv', header=False, index=False)
    test_data.to_csv('/tmp/test_real.csv', header=False, index=False)
    test_data.drop(['is_defaulter'], axis=1).to_csv('/tmp/test.csv', header=False, index=False)
    s3.Bucket(bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('/tmp/train.csv')
    s3.Bucket(bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('/tmp/validation.csv')
    s3.Bucket(bucket).Object(os.path.join(prefix, 'test/test.csv')).upload_file('/tmp/test.csv')
    return {
        'statusCode': 200,
        'body': ('Date pre-processing complete for ' + filename + ' - ' + bucket + '/' + prefix + '/' + filename)
    }
        ''')
MakeFile(file_name)

# finally, create the runtime file with Pandas, Numpy, and your pre-processing code...
import zipfile
def zipFilesInDir(dirName, zipFileName):
   # create the zip file...
   with zipfile.ZipFile(zipFileName, 'w', zipfile.ZIP_DEFLATED) as zipObj:
        # iterate over all the files in the directory
        #folder = os.path.abspath(dirName)
        #os.chdir(folder)
        for folderName, subfolders, filenames in os.walk(dirName):
            for filename in filenames:
               # create complete filepath of file in directory
               filePath = os.path.join(folderName, filename)
               # add file to zip
               zipObj.write(filePath, arcname=os.path.join(os.path.relpath(folderName, os.path.abspath(dirName)), filename))
zipFilesInDir('lambda', 'lambda_function.zip')
!rm -fr lambda

try:
    f = open("lambda_function.zip")
    print("Lambda file created: lambda_function.zip")
except IOError:
    print("Error - Lambda file not created")
finally:
    f.close()

# load the zip file as binary code...
with open('lambda_function.zip', 'rb') as f: 
    code = f.read()

# and create the AWS Lambda function...
client = boto3.client('lambda')
create_lambda_function = client.create_function(
    FunctionName = 'glovo-pre-processing',
    Runtime = 'python3.8',
    Role = lambda_role,
    Description = 'Sample ML pipeline data pre-processing lambda',
    Code = {'ZipFile': code},
    Handler='{}.lambda_handler'.format('lambda_function'),
    Timeout = 600,
    MemorySize = 1024,
    Environment = {
        'Variables': {
            'BUCKET': bucket,
            'PREFIX': prefix,
            'FILENAME': filename
        }
    }
)

----------

### Building our ML pipeline with the AWS Step Functions Data Science SDK

You are now ready for creating your actual ML pipeline steps. You will start by preparing the training job for Amazon SageMaker, and the data transformation function in AWS Lambda that you created before.

In [None]:
xgb = sagemaker.estimator.Estimator(
    get_image_uri(boto3.Session().region_name, 'xgboost'),
    sagemaker_execution_role, 
    train_instance_count = 1, 
    train_instance_type = 'ml.m5.xlarge',
    train_volume_size = 5,
    output_path = 's3://{}/{}/output'.format(bucket, prefix),
    sagemaker_session = session
)

xgb.set_hyperparameters(objective='binary:logistic',
                         alpha = 0.0017958870152480393,
                         colsample_bytree = 0.8974444697232986,
                         eta = 0.378416419404957,
                         gamma = 0.0038479366336815115,
                         max_depth = 22,
                         min_child_weight = 3.4445863514152535,
                         num_round = 139,
                         subsample = 0.7432022124726009
                        )

In [None]:
# SageMaker expects unique names for jobs/models/endpoints. Pass these for each execution via placeholders:
execution_input = ExecutionInput(schema={
    'JobName': str, 
    'ModelName': str
})

In [None]:
preparation_step = steps.LambdaStep(
    'Preparing data (Lambda)',
    parameters={  
        "FunctionName": "glovo-pre-processing",
        "Payload": {  
           "JobName": execution_input['JobName']
        }
    }
)

preparation_step.add_retry(steps.Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=15,
    max_attempts=2,
    backoff_rate=4.0
))

In [None]:
training_step = steps.TrainingStep(
    'Training (SageMaker)', 
    estimator=xgb,
    data={
        'train': sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv'),
        'validation': sagemaker.s3_input(s3_data='s3://{}/{}/validation'.format(bucket, prefix), content_type='csv')
    },
    job_name=execution_input['JobName']  
)

In [None]:
model_step = steps.ModelStep(
    'Save model (SageMaker)',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName']  
)

You will now add another AWS Lambda function for validating the accuracy of your trained model, once the training job completes. In this case, use the metric for the validation loss provided by the Amazon SageMaker XGBoost model by default.

In [None]:
# Create the validation AWS Lambda:
file_name = 'lambda_function.py'
def MakeFile(file_name):
    with open(file_name, 'w') as f:
        f.write('''\
import json
import boto3
def lambda_handler(event, context):
    sm = boto3.client('sagemaker')
    vloss = sm.describe_training_job(TrainingJobName=event['JobName'])['FinalMetricDataList'][0]['Value']
    print(vloss)
    return {
        'statusCode': 200,
        'vloss': json.dumps(vloss)
    }
        ''')
MakeFile(file_name)

# create the zip file...
with zipfile.ZipFile('lambda_function.zip', 'w', zipfile.ZIP_DEFLATED) as zipObj:
    zipObj.write('lambda_function.py')
!rm -f lambda_function.py

try:
    f = open("lambda_function.zip")
    print("Lambda file created: lambda_function.zip")
except IOError:
    print("Error - Lambda file not created")
finally:
    f.close()

# Loads the zip file as binary code. 
with open('lambda_function.zip', 'rb') as f: 
    code = f.read()

client = boto3.client('lambda')
create_lambda_function = client.create_function(
    FunctionName = 'glovo-validation',
    Runtime = 'python3.8',
    Role = lambda_role,
    Handler = '{}.lambda_handler'.format('lambda_function'),
    Description = 'Sample ML pipeline validation metric lambda',
    Code = {'ZipFile': code},
    Timeout = 600,
    MemorySize = 128
)


In [None]:
validation_lambda_step = steps.LambdaStep(
    'Validating loss (Lambda)',
    parameters={  
        "FunctionName": "glovo-validation",
        "Payload": {  
           "JobName": execution_input['JobName']
        }
    }
)

validation_lambda_step.add_retry(steps.Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=15,
    max_attempts=2,
    backoff_rate=4.0
))

For illustrating a typical decision logic that is common in ML workflows, you might want to automate the process of deploying new models to production when new data is available and re-trainings are performed. For example, by comparing the new models’ performance versus a given threshold.

In this example, you decide upon the validation loss whether to continue the ML pipeline for running inferences for the new model in production, or otherwise stop the pipeline if the new model is not meeting the accuracy threshold (set at 3% just for illustrating the concept). This is a mechanism that looks to automate the re-training with new data. For example, you could trigger this ML pipeline everyday with the new data provided from your recent customer's transactions.

In [None]:
transform_step = steps.TransformStep(
    'Batch inference (SageMaker)',
    transformer=xgb.transformer(
        instance_count=1,
        instance_type='ml.m5.large'
    ),
    job_name=execution_input['JobName'],     
    model_name=execution_input['ModelName'], 
    data='s3://{}/{}/test'.format(bucket, prefix),
    content_type='text/csv',
    split_type='Line'
)

In [None]:
worse_step = steps.Pass(
    'Worse model',
    parameters={
        "Error": ("The new model is not accurate enough. Validation loss:" + str(validation_lambda_step.output()["Payload"]["vloss"]))
    }
)

In [None]:
choice_state = steps.Choice(
    state_id='validation loss < 3% ?' #your desired threshold for validation loss
)

In [None]:
#Loss values with your desired threshold
choice_state.add_choice(
    rule=steps.ChoiceRule.StringLessThan(variable=validation_lambda_step.output()["Payload"]["vloss"], value="0.03"),
    next_step=transform_step
)
choice_state.add_choice(
    rule=steps.ChoiceRule.StringGreaterThanEquals(variable=validation_lambda_step.output()["Payload"]["vloss"], value="0.03"),
    next_step=worse_step
)

Assuming the validation loss is good enough, you can now perform an Amazon SageMaker Batch Transformation for running inferences on all of your testing dataset.

*Note just FYI: In this case you are not creating and 'Endpoint Configuration' and an 'Endpoint', but shall you need it in your use case to respond to real-time inferences you can then follow the steps in the AWS Step Functions Data Science SDK examples, similar to the following lines:*
```
endpoint_config_step = steps.EndpointConfigStep(
    "Create Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.large'
)

endpoint_step = steps.EndpointStep(
    "Create Endpoint",
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName']
)
```

You are now ready for chaining the steps of your ML pipeline with the AWS Step Functions Data Science SDK, and set up the workflow with the create command.

In [None]:
workflow_definition = steps.Chain([
    preparation_step,
    training_step,
    model_step,
    validation_lambda_step,
    choice_state
])

In [None]:
workflow = Workflow(
    name='glovo-courier-notebook',
    definition=workflow_definition,
    role=workflow_execution_role,
    execution_input=execution_input
)

In [None]:
workflow.render_graph(portrait=False)

Keep in mind that when you use the SDK in Jupyter notebooks, you define the workflows locally in the notebook instance but **they do not actually exist on AWS Step Functions until the “create” command is called**. Similarly, **these are not executed until the “execute” command is called**, after which you can track its progress on the notebook.

In [None]:
workflow.create()

Now you are ready for testing this pipeline by calling the execute command, and debugging its execution logs if necessary.

In [None]:
execution = workflow.execute(
    inputs={
        'JobName': 'glovo-courier-default-{}'.format(uuid.uuid1().hex), # Each Sagemaker Job requires a unique name
        'ModelName': 'glovo-courier-default-{}'.format(uuid.uuid1().hex), # Each Model requires a unique name
    }
)

In [None]:
execution.render_progress()

*Note you can re-run the previous cell until verifying the execution is completed. It is also possible to verify the execution details directly on the [AWS Step Functions console](https://eu-west-1.console.aws.amazon.com/states/).*

You can also list the events with its details.

In [None]:
execution.list_events(html=True)

In [None]:
workflow.list_executions(html=True)

In [None]:
workflow.list_workflows(html=True)

You can even export the AWS CloudFormation template for the pipeline you have just built, in order to deploy it later on as infrastructure as code if required.

In [None]:
print(Workflow.get_cloudformation_template(workflow))

Congratulations, your ML workflow is now complete, and the data scientists can now pass this Cloud Formation template to the DevOps engineers for setting up the full piipeline (see the next lab).

As a bonus, you can follow the steps in this documentation for setting up an automatic triggering of our workflow as soon as our Amazon S3 bucket receives new data, in order to automate the re-training of the model.
https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-cloudwatch-events-s3.html