# Automated Reciprocal Recommender Model Retraining and Inference Workflow

1. [Setup](#Setup)
1. [Create Resources](#Create-Resources)
1. [Build the Reciprocal Recommender Workflow](#Build-the-Reciprocal-Recommender-Workflow)
1. [Run the Workflow](#Run-the-Workflow)
1. [Clean Up](#Clean-Up)

## Setup

### Import the required modules

First, we should install and load all the required modules

In [None]:
import sys
!{sys.executable} -m pip install --upgrade stepfunctions

In [None]:
import uuid
import logging
import stepfunctions
import boto3
import sagemaker
import zipfile

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import s3_input
from sagemaker.s3 import S3Uploader
from stepfunctions import steps
from stepfunctions.steps import TrainingStep, ModelStep
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

session = sagemaker.Session()
stepfunctions.set_stream_logger(level=logging.INFO)

region = boto3.Session().region_name
bucket = session.default_bucket()

lambda_client = boto3.client('lambda')

id = uuid.uuid4().hex

#Create unique names for the AWS Lambda functions to be created. If you change
#the default name, you may need to change the Step Functions execution role.
processing_function_name = 'query-processing-status'
create_preprocessing_function_name = 'create-preprocessing-job'
create_batch_pred_function_name = 'create-batch-pred-job'

#Create a unique name for the AWS Glue job to be created. If you change the 
#default name, you may need to change the Step Functions execution role.
glue_job_name = 'glue-batch-load-recs'

#Model end point name
endpoint_name = 'recommender-endpoint'

### Build and push Docker image to ECR

Next, we need to build the Docker images for preprocessing, training and inference and push it to ECR to be ready for use by SageMaker.

In [None]:
%%sh

algorithm_name=tap-up-recommender-tf

cd container

chmod +x training_code/train
chmod +x serving_code/serve

account=$(aws sts get-caller-identity --query Account --output text)

# Get the region defined in the current configuration (default to ap-northeast-1 if none defined)
region=$(aws configure get region)
region=${region:-ap-northeast-1}

fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"

# If the repository doesn't exist in ECR, create it.

aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1

if [ $? -ne 0 ]
then
    aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi

# Get the login command from ECR and execute it directly
$(aws ecr get-login --region ${region} --no-include-email)

# Build the docker image locally with the image name and then push it to ECR
# with the full name.

docker build -t ${algorithm_name} -f Dockerfile.gpu .
docker tag ${algorithm_name} ${fullname}

docker push ${fullname}

In [None]:
training_container_uri = '987654321.dkr.ecr.ap-northeast-1.amazonaws.com/tap-up-recommender-tf:latest'

### Setup the IAM roles

Next, we'll create fine-grained IAM roles for the Lambda, Glue, and Step Functions resources that we will code. The IAM roles grant the services permissions within your AWS environment.


### Add permissions to your notebook role in IAM

The IAM role assumed by your notebook requires permission to create and run workflows in AWS Step Functions. If this notebook is running on a SageMaker notebook instance, do the following to provide IAM permissions to the notebook:

1. Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). 
2. Select **Notebook instances** and choose the name of your notebook instance.
3. Under **Permissions and encryption** select the role ARN to view the role on the IAM console.
4. Copy and save the IAM role ARN for later use. 
5. Choose **Attach policies** and search for `AWSStepFunctionsFullAccess`.
6. Select the check box next to `AWSStepFunctionsFullAccess` and choose **Attach policy**.

We also need to provide permissions that allow the notebook instance the ability to create an AWS Lambda function and AWS Glue job. We will edit the managed policy attached to our role directly to encorporate these specific permissions:

1. Under **Permisions policies** expand the AmazonSageMaker-ExecutionPolicy-******** policy and choose **Edit policy**.
2. Select **Add additional permissions**. Choose **IAM**  for Service and **PassRole** for Actions.
3. Under Resources, choose **Specific**. Select **Add ARN** and enter `recommender-lambda-role` for **Role name with path*** and choose **Add**. You will create this role later on in this notebook.
4. Select **Add additional permissions** a second time. Choose **Lambda** for Service, **Write** for Access level, and **All resources** for Resources.
5. Select **Add additional permissions** a final time. Choose **Glue** for Service, **Write** for Access level, and **All resources** for Resources.
6. Choose **Review policy** and then **Save changes**.

If you are running this notebook outside of SageMaker, the SDK will use your configured AWS CLI configuration. For more information, see [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html).

Next, let's create an execution role in IAM for Step Functions. 

### Create an Execution Role for Step Functions

Your Step Functions workflow requires an IAM role to interact with other services in your AWS environment. 

1. Go to the [IAM console](https://console.aws.amazon.com/iam/).
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Step Functions**.
4. Choose **Next** until you can enter a **Role name**.
5. Enter a name such as `StepFunctionsWorkflowExecutionRole` and then select **Create role**.

Next, create and attach a policy to the role you created. As a best practice, the following steps will attach a policy that only provides access to the specific resources and actions needed for this solution.

1. Under the **Permissions** tab, click **Attach policies** and then **Create policy**.
2. Enter the following in the **JSON** tab:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "arn:aws:iam::987654321:role/service-role/AmazonSageMaker-ExecutionRole-20200220T987654",
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "sagemaker.amazonaws.com"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "sagemaker:CreateModel",
                "sagemaker:DeleteEndpointConfig",
                "sagemaker:DescribeTrainingJob",
                "sagemaker:CreateEndpoint",
                "sagemaker:StopProcessingJob",
                "sagemaker:StopTrainingJob",
                "sagemaker:StopTransformJob",
                "sagemaker:CreateTrainingJob",
                "sagemaker:CreateProcessingJob",
                "sagemaker:CreateTransformJob",
                "sagemaker:UpdateEndpoint",
                "sagemaker:CreateEndpointConfig",
                "sagemaker:DeleteEndpoint"
            ],
            "Resource": [
                "arn:aws:sagemaker:*:*:*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:DescribeRule",
                "events:PutRule",
                "events:PutTargets"
            ],
            "Resource": [
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule",
                "arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:query-training-status*",
                "arn:aws:lambda:*:*:function:query-processing-status*",
                "arn:aws:lambda:*:*:function:create-preprocessing-job*",
                "arn:aws:lambda:*:*:function:create-batch-pred-job*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "glue:StartJobRun",
                "glue:GetJobRun",
                "glue:BatchStopJobRun",
                "glue:GetJobRuns"
            ],
            "Resource": "arn:aws:glue:*:*:job/glue-batch-load-recs*"
        }
    ]
}
```

3. Replace **NOTEBOOK_ROLE_ARN** with the ARN for your notebook that you created in the previous step.
4. Choose **Review policy** and give the policy a name such as `StepFunctionsWorkflowExecutionPolicy`.
5. Choose **Create policy**.
6. Select **Roles** and search for your `StepFunctionsWorkflowExecutionRole` role.
7. Under the **Permissions** tab, click **Attach policies**.
8. Search for your newly created `StepFunctionsWorkflowExecutionPolicy` policy and select the check box next to it.
9. Choose **Attach policy**. You will then be redirected to the details page for the role.
10. Copy the StepFunctionsWorkflowExecutionRole **Role ARN** at the top of the Summary.

### Configure Execution Roles

In [None]:
# paste the StepFunctionsWorkflowExecutionRole ARN from above
workflow_execution_role = 'arn:aws:iam::987654321:role/StepFunctionsWorkflowExecutionRole'

# SageMaker Execution Role
# You can use sagemaker.get_execution_role() if running inside sagemaker's notebook instance
sagemaker_execution_role = sagemaker.get_execution_role() #Replace with ARN if not in an AWS SageMaker notebook

#### Create a Glue IAM Role
You need to create an IAM role so that you can create and execute an AWS Glue Job on your data in Amazon S3.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/).
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Glue**.
4. Choose **Next** until you can enter a **Role name**.
5. Enter a name such as `Glue-S3AthenaDDBPipeline` and then select **Create role**.

Next, create and attach a policy to the role you created. The following steps attach a managed policy that provides Glue access to the specific S3 bucket holding your data.

1. Under the **Permissions** tab, click **Attach policies** and then **Create policy**.
2. Enter the following in the **JSON** tab:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "PassRole",
            "Effect": "Allow",
            "Action": "iam:PassRole",
            "Resource": "*"
        },
        {
            "Sid": "GlueScriptPermissions",
            "Effect": "Allow",
            "Action": [
                "athena:BatchGetQueryExecution",
                "athena:GetQueryExecution",
                "athena:GetQueryResults",
                "athena:GetQueryResultsStream",
                "athena:GetWorkGroup",
                "dynamodb:BatchWriteItem",
                "glue:GetTable",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:PutObject",
                "s3:ReplicateObject",
                "s3:RestoreObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:athena:*:*:workgroup/*",
                "arn:aws:dynamodb:*:*:table/*",
                "arn:aws:glue:*:*:catalog",
                "arn:aws:glue:*:*:database/reciprocalrec",
                "arn:aws:glue:*:*:table/reciprocalrec/*",
                "arn:aws:s3:::sagemaker-ap-northeast-1-987654321",
                "arn:aws:s3:::sagemaker-ap-northeast-1-987654321/*"
            ]
        },
        {
            "Sid": "Logs",
            "Effect": "Allow",
            "Action": [
                "athena:GetCatalogs",
                "logs:Create*",
                "logs:Put*"
            ],
            "Resource": "*"
        }
    ]
}
```

3. Run the next cell (below) to retrieve the specific **S3 bucket name** that we will grant permissions to.

In [None]:
session = sagemaker.Session()
bucket = session.default_bucket()
print(bucket)

4. Copy the output of the above cell and replace the **occurance** of **BUCKET-NAME** in the JSON text that you entered and similarly copy the database name from Athena and replace the **occurances** of **DATABASE-NAME**.
5. Choose **Review policy** and give the policy a name such as `S3AthenaGlueDDBPipelinePolicy`.
6. Choose **Create policy**.
7. Select **Roles**, then search for and select your `Glue-S3AthenaDDBPipeline` role.
8. Under the **Permissions** tab, click **Attach policies**.
9. Search for your newly created `S3AthenaGlueDDBPipelinePolicy` policy and select the check box next to it.
10. Choose **Attach policy**. You will then be redirected to the details page for the role.
11. Copy the **Role ARN** at the top of the Summary tab.

In [None]:
# paste the AWS-Glue-S3-Bucket-Access role ARN from above
glue_role = 'arn:aws:iam::987654321:role/Glue-S3AthenaDDBPipeline'

#### Create a Lambda IAM Role
You also need to create an IAM role so that you can create and execute an AWS Lambda function stored in Amazon S3.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/).
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Lambda**.
4. Choose **Next** until you can enter a **Role name**.
5. Enter a name such as `recommender-lambda-role` and then select **Create role**.

Next, attach policies to the role you created. The following steps attach policies that provides Lambda access to DynamoDB, Step Functions, S3 and read-only access to SageMaker.

1. Under the **Permissions** tab, click **Attach Policies**.
2. In the search box, type **SageMaker** and select **AmazonSageMakerFullAccess** from the populated list.
3. In the search box type **AWSLambda** and select **AWSLambdaBasicExecutionRole** from the populated list.
4. Create custom policies to access **DynamoDB**, **Systems Manager** and **StepFunctions** and select them.
5. Choose **Attach policy**. You will then be redirected to the details page for the role.
6. Copy the **Role ARN** at the top of the **Summary**.


In [None]:
# Paste the recommender-lambda-role role ARN from above
lambda_role = 'arn:aws:iam::987654321:role/recommender-lambda-role'

### Prepare the Dataset

In [None]:
project_name = 'reciprocal_rec_system'
input_key = 'data/input/
input_data_source = 's3://{}/{}/{}'.format(bucket, project_name, input_key)

## Create Resources
In the following steps we'll create all the lambda functions and the Glue job that are called from the Step Functions workflow.

### Create the AWS Lambda Function that polls Preprocessing and Batch Inference jobs to check their status periodically

In [None]:
query_processing_zip_name = 'query_processing_status.zip'
query_processing_lambda_source_code = './container/training_code/query_processing_status.py'

with zipfile.ZipFile(query_processing_zip_name, mode='w') as zf:
    zf.write(query_processing_lambda_source_code, arcname=query_processing_lambda_source_code.split('/')[-1])


S3Uploader.upload(local_path=query_processing_zip_name, 
                  desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                  session=session)

In [None]:
response = lambda_client.create_function(
    FunctionName=processing_function_name,
    Runtime='python3.7',
    Role=lambda_role,
    Handler='query_processing_status.lambda_handler',
    Code={
        'S3Bucket': bucket,
        'S3Key': '{}/{}'.format(project_name, query_processing_zip_name)
    },
    Description='Queries a SageMaker processing job and returns the results',
    Timeout=15,
    MemorySize=128
)

### Create the AWS Lambda function that creates and triggers the Preprocessing job

In [None]:
etl_zip_name = 'create_preprocessing_job.zip'
etl_source_code = 'container/training_code/create_preprocessing_job.py'

with zipfile.ZipFile(etl_zip_name, 'w') as zf:
    zf.write(etl_source_code, arcname=etl_source_code.split('/')[-1])

S3Uploader.upload(local_path=etl_zip_name, 
                  desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                  session=session)

In [None]:
response = lambda_client.create_function(
    FunctionName=create_preprocessing_function_name,
    Runtime='python3.7',
    Role=lambda_role,
    Handler='create_preprocessing_job.lambda_handler',
    Code={
        'S3Bucket': bucket,
        'S3Key': '{}/{}'.format(project_name, etl_zip_name)
    },
    Description='Triggers the preprocessing job that makes data to be suitable for training',
    Timeout=15,
    MemorySize=128
)

### Create the AWS Lambda function that creates and triggers the Batch Inference job


In [None]:
batch_pred_zip_name = 'create_batch_pred_job.zip'
batch_pred_source_code = 'container/training_code/create_batch_pred_job.py'

with zipfile.ZipFile(batch_pred_zip_name, 'w') as zf:
    zf.write(batch_pred_source_code, arcname=batch_pred_source_code.split('/')[-1])

S3Uploader.upload(local_path=batch_pred_zip_name, 
                  desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                  session=session)

In [None]:
response = lambda_client.create_function(
    FunctionName=create_batch_pred_function_name,
    Runtime='python3.7',
    Role=lambda_role,
    Handler='create_batch_pred_job.lambda_handler',
    Code={
        'S3Bucket': bucket,
        'S3Key': '{}/{}'.format(project_name, batch_pred_zip_name)
    },
    Description='Triggers the inference job that generates the recommendations',
    Timeout=15,
    MemorySize=128
)

### Create the AWS Glue Job

In [None]:
glue_script_location = S3Uploader.upload(local_path='./container/training_code/glue_batch_load_recs.py',
                               desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                               session=session)
glue_client = boto3.client('glue')

response = glue_client.create_job(
    Name=glue_job_name,
    Description='PySpark job to extract the parquet data from S3 and load it to DynamoDB',
    Role=glue_role,
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--job-language': 'python'
    },
#    MaxCapacity=30,
    GlueVersion='1.0',
    WorkerType='G.1X',
    NumberOfWorkers=2,
    Timeout=100
)

### Configure the Recommender Estimator for training

In [None]:
skipgram_estimator = sagemaker.estimator.Estimator(image_name=training_container_uri,
                                    role=sagemaker_execution_role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.p2.8xlarge',
                                    hyperparameters={'vector_size': 50, 'epoch_count': 40, 'batch_value': 32768},
                                    output_path='s3://{}/{}/data/model'.format(bucket, project_name),
                                    metric_definitions=[{'Name': 'train:loss', 'Regex': '.*loss:\\s*(\\S+).*'}],
                                    enable_sagemaker_metrics=True,
                                    input_mode= 'File')

## Build the Reciprocal Recommender Workflow

### Setup the workflow input schema

In [None]:
execution_input = ExecutionInput(schema={
    'PreprocessingJobName': str,
    'TrainingJobName': str,
    'ProcessingLambdaFunctionName': str,
    'BatchPredJobName': str,
    'CreateBatchPredLambdaFunctionName': str,
    'CreatePreprocessingLambdaFunctionName': str,
    'GlueBatchJobName': str,
    'ModelName': str,
    'S3ModelPath': str,
    'S3PreprocessedPath': str,
    'S3RecommendationsPath': str,
    'EndpointName': str,
    'DoTraining': bool,
    'DoPreprocessing': bool,
    'DoBatchRecommend': bool,
    'CreateNewEndpoint': bool
})

## Create the pipelines in the workflow in reverse starting from the end (inference, training and preprocessing)

## Inference Pipeline
### Create a Batch Prediction Step
Next, we create a batch prediction step that generates the recommendations for all the users in the dataset and saves the results in S3

In [None]:
batch_pred_configuration = dict(
    JobName=execution_input['BatchPredJobName'],
    IAMRole=sagemaker_execution_role,
    LocalStorageSizeGB=50,
    S3InputDataPathModelData = execution_input['S3ModelPath'],
    S3OutputDataPath=execution_input['S3RecommendationsPath'],
    EcrContainerUri=training_container_uri,
)

In [None]:
create_batch_pred_job_step = steps.compute.LambdaStep(
    "Create Batch Inference Job",
    parameters={  
        "FunctionName": execution_input['CreateBatchPredLambdaFunctionName'],
        "Payload": {  
           "Configuration": batch_pred_configuration
        }
    },
    result_path='$.CreateBatchPredLambdaResult'
)

### Create a lambda step to query BatchPred job status

In [None]:
lambda_step_batch_pred = steps.compute.LambdaStep(
    'Query Batch Inference Results',
    parameters={  
        "FunctionName": execution_input['ProcessingLambdaFunctionName'],
        "Payload":{
            "ProcessingJobName.$": "$.BatchPredJobName"
        }
    },
    result_path='$.BatchPredLambdaResult'
)

### Create a wait state for 60s before querying BatchPred every time

In [None]:
check_batch_pred_job_wait_state = steps.states.Wait(
    "Wait-2: 60 secs",
    seconds=60
)

### Create a workflow Success Step

In [None]:
success_step = steps.states.Succeed(
    'Recommender Workflow Succeeded',
    comment='Final state'
)

### Create BatchPrediction Failure Step

In [None]:
batch_pred_fail_step = steps.states.Fail(
    "Batch Inference Failed",
    comment = "Could not generate recommendations"
)

### Create a batch recommendations generation step with AWS Glue
In the following cell, we create a Glue step thats runs an AWS Glue job. The Glue job extracts the recommendations data from the parquet files in S3, processes the data in the required format if required and then saves the data to DynamoDB. Glue is performing this extraction, transformation, and load (ETL) in a serverless fashion, so there are no compute resources to configure and manage. See the [GlueStartJobRunStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.GlueStartJobRunStep) Compute step in the AWS Step Functions Data Science SDK documentation.

In [None]:
load_recs_to_ddb_step = steps.GlueStartJobRunStep(
    'Load DDB Recommendations',
    parameters={"JobName": execution_input['GlueBatchJobName'],
                "Arguments":{
                    '--S3_SOURCE': execution_input['S3RecommendationsPath'],
                    '--DDB_DEST': 'recommendations'}
               }
)
load_recs_to_ddb_step.next(success_step)

### Create a BatchPred Choice State Step 
In the following cell, we create a choice step in order to build a dynamic workflow. This choice step branches based off of the results of our Query Batch Predition Results step: did the BatchPred job fail or should the results be loaded into DynamoDB? Otherwise should the workflow wait if BatchPred job is still running 

In [None]:
check_job_choice_batch_pred = steps.states.Choice(
    "Check Batch Inference Status"
)

batch_pred_failed = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_batch_pred.output()['BatchPredLambdaResult']['Payload']['ProcessingJobStatus'], value='Failed')
batch_pred_running = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_batch_pred.output()['BatchPredLambdaResult']['Payload']['ProcessingJobStatus'], value='InProgress')
batch_pred_finished = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_batch_pred.output()['BatchPredLambdaResult']['Payload']['ProcessingJobStatus'], value='Completed')

check_job_choice_batch_pred.add_choice(
    rule = batch_pred_running,
    next_step=lambda_step_batch_pred
)

check_job_choice_batch_pred.add_choice(
    rule = batch_pred_failed,
    next_step = batch_pred_fail_step
)

check_job_choice_batch_pred.add_choice(
    rule = batch_pred_finished,
    next_step=load_recs_to_ddb_step,
)

### Link all the Steps Together
Finally, create your workflow definition by chaining all of the steps together that we've created. See [Chain](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.states.Chain) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
inference_workflow_definition = steps.Chain([
    create_batch_pred_job_step, 
    lambda_step_batch_pred,
    check_batch_pred_job_wait_state,
    check_job_choice_batch_pred
])

### Create an entrypoint to the inference pipeline

In [None]:
inference_entry_checkpoint = steps.states.Choice(
    "Do Inference?"
)

skip_batch_pred = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoBatchRecommend', value=False)
do_batch_pred = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoBatchRecommend', value=True)

inference_entry_checkpoint.add_choice(
    rule = skip_batch_pred,
    next_step=success_step
)

inference_entry_checkpoint.add_choice(
    rule = do_batch_pred,
    next_step=inference_workflow_definition
)

## Training Pipeline
### Create a SageMaker Training Step 

In the following cell, we create the training step and pass the estimator we defined above. See  [TrainingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
import os
training_step = steps.TrainingStep(
    'Train Model',
    data={
        'training': s3_input(execution_input['S3PreprocessedPath']),
    },
    estimator=skipgram_estimator,
    job_name=execution_input['TrainingJobName'],
    wait_for_completion=True,
    result_path='$.TrainingJobResults'
)

### Create a Model Step 

In the following cell, we define a model step that will create a model in Amazon SageMaker using the artifacts created during the TrainingStep. See  [ModelStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ModelStep) in the AWS Step Functions Data Science SDK documentation to learn more.

The model creation step typically follows the training step. The Step Functions SDK provides the [get_expected_model](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.TrainingStep.get_expected_model) method in the TrainingStep class to provide a reference for the trained model artifacts. Please note that this method is only useful when the ModelStep directly follows the TrainingStep.

In [None]:
model_step = steps.ModelStep(
    'Save Model',
    model=training_step.get_expected_model(),
    model_name=execution_input['ModelName'],
    instance_type='ml.m5.12xlarge',
    input_path = '$.TrainingJobResults',
    result_path='$.ModelStepResults',
)

### Create an Endpoint Configuration Step
In the following cell we create an endpoint configuration step. See [EndpointConfigStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.EndpointConfigStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
endpoint_config_step = steps.EndpointConfigStep(
    "Create Model Endpoint Config",
    endpoint_config_name=execution_input['ModelName'],
    model_name=execution_input['ModelName'],
    initial_instance_count=1,
    instance_type='ml.m5.12xlarge',
    result_path='$.EndpointConfigResults'
)

### Create the Model Endpoint Step
In the following cell, we create the Endpoint step to deploy the new model as a managed API endpoint, creating a new SageMaker endpoint.

In [None]:
create_endpoint_step = steps.EndpointStep(
    'Create Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
    update=False,
    result_path='$.EndpointUpdateResults'
)
create_endpoint_step.next(inference_entry_checkpoint)

### Update the Model Endpoint Step
In the following cell, we create the Endpoint step to deploy the new model as a managed API endpoint, updating an existing SageMaker endpoint.

In [None]:
update_endpoint_step = steps.EndpointStep(
    'Update Model Endpoint',
    endpoint_name=execution_input['EndpointName'],
    endpoint_config_name=execution_input['ModelName'],
    update=True,
    result_path='$.EndpointUpdateResults'
)
update_endpoint_step.next(inference_entry_checkpoint)

### Create an Endpoint Choice State Step

In [None]:
check_endpoint_choice = steps.states.Choice(
    "Check Endpoint Status"
)

do_endpoint_update = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.CreateNewEndpoint', value=False)
do_endpoint_create = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.CreateNewEndpoint', value=True)

check_endpoint_choice.add_choice(
    rule = do_endpoint_update,
    next_step=update_endpoint_step
)

check_endpoint_choice.add_choice(
    rule = do_endpoint_create,
    next_step=create_endpoint_step
)

### Link all the Training steps together
Finally, create your workflow definition by chaining all of the training pipeline steps together that we've created.

In [None]:
train_workflow_definition = steps.Chain([
    training_step,
    model_step,
    endpoint_config_step,
    check_endpoint_choice
])

### Create an entrypoint to the training pipeline

In [None]:
train_entry_checkpoint = steps.states.Choice(
    "Do Training?"
)

skip_training = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoTraining', value=False)
do_training = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoTraining', value=True)

train_entry_checkpoint.add_choice(
    rule = skip_training,
    next_step=inference_entry_checkpoint
)

train_entry_checkpoint.add_choice(
    rule = do_training,
    next_step=train_workflow_definition
)

## Preprocessing Pipeline
### Create a Preprocesing Step

In the following cell, we trigger the preprocessing step using an AWS Lambda function and pass the filters for preprocessing. See  [ProcessingStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/sagemaker.html#stepfunctions.steps.sagemaker.ProcessingStep) in the AWS Step Functions Data Science SDK documentation to learn more.

In [None]:
data_preprocessing_configuration = dict(
    JobName=execution_input['PreprocessingJobName'],
    IAMRole=sagemaker_execution_role,
    LocalStorageSizeGB=50,
    S3InputDataPath=input_data_source,
    S3OutputDataPath=execution_input['S3PreprocessedPath'],
    EcrContainerUri=training_container_uri,
)

In [None]:
create_preprocessing_job_step = steps.compute.LambdaStep(
    "Create Preprocessing Job",
    parameters={  
        "FunctionName": execution_input['CreatePreprocessingLambdaFunctionName'],
        "Payload": {  
           "Configuration": data_preprocessing_configuration
        }
    },
    result_path='$.CreatePreprocessingLambdaResult'
)

### Create a lambda step to query Preprocessing status

In [None]:
lambda_step_preprocessing = steps.compute.LambdaStep(
    'Query Preprocessing Results',
    parameters={  
        "FunctionName": execution_input['ProcessingLambdaFunctionName'],
        "Payload":{
            "ProcessingJobName.$": "$.PreprocessingJobName"
        }
    },
    result_path='$.PreprocessingLambdaResult'
)

### Create a wait state for 60s before querying Preprocessing every time

In [None]:
check_preprocessing_job_wait_state = steps.states.Wait(
    "Wait-1: 60 secs",
    seconds=60
)

### Create a Preprocessing Failure Step

In [None]:
preprocessing_fail_step = steps.states.Fail(
    'Preprocessing Failed',
    comment='Error while preprocessing data for training'
)

### Create a Preprocessing Choice State Step 
In the following cell, we create a choice step in order to build a dynamic workflow. This choice step branches based off of the results of our Query Preprocessing Results step: did the Preprocessing job fail or if successful, should the workflow proceed to Training.

In [None]:
check_job_choice_preprocessing = steps.states.Choice(
    "Check Preprocessing Status"
)

preprocessing_failed = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_preprocessing.output()['PreprocessingLambdaResult']['Payload']['ProcessingJobStatus'], value='Failed')
preprocessing_running = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_preprocessing.output()['PreprocessingLambdaResult']['Payload']['ProcessingJobStatus'], value='InProgress')
preprocessing_finished = steps.choice_rule.ChoiceRule.StringEquals(variable=lambda_step_preprocessing.output()['PreprocessingLambdaResult']['Payload']['ProcessingJobStatus'], value='Completed')

check_job_choice_preprocessing.add_choice(
    rule = preprocessing_running,
    next_step=lambda_step_preprocessing
)

check_job_choice_preprocessing.add_choice(
    rule = preprocessing_failed,
    next_step = preprocessing_fail_step
)

check_job_choice_preprocessing.add_choice(
    rule = preprocessing_finished,
    next_step=train_entry_checkpoint
)

### Linking all steps together

In [None]:
preprocess_workflow_definition = steps.Chain([
    create_preprocessing_job_step, 
    lambda_step_preprocessing,
    check_preprocessing_job_wait_state,
    check_job_choice_preprocessing,
])

### Create an entrypoint to the preprocessing pipeline

In [None]:
preprocessing_entry_checkpoint = steps.states.Choice(
    "Do Preprocessing?"
)

skip_preprocessing = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoPreprocessing', value=False)
do_preprocessing = steps.choice_rule.ChoiceRule.BooleanEquals(variable='$.DoPreprocessing', value=True)

preprocessing_entry_checkpoint.add_choice(
    rule = skip_preprocessing,
    next_step=train_entry_checkpoint
)

preprocessing_entry_checkpoint.add_choice(
    rule = do_preprocessing,
    next_step=preprocess_workflow_definition
)

## Run the Workflow
Create your workflow using the workflow definition above, and render the graph with [render_graph](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.render_graph):

In [None]:
e2e_workflow = Workflow(
    name='End2End-Routine-{}'.format(id),
    definition=steps.Chain([preprocessing_entry_checkpoint]),
    role=workflow_execution_role,
    execution_input=execution_input
)

In [None]:
e2e_workflow.render_graph()

In [None]:
e2e_workflow.create()

Run the workflow with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute):

In [None]:
execution = e2e_workflow.execute(
    inputs={
    'ProcessingLambdaFunctionName': processing_function_name,
    'CreatePreprocessingLambdaFunctionName': create_preprocessing_function_name,
    'CreateBatchPredLambdaFunctionName': create_batch_pred_function_name,        
    'PreprocessingJobName': 'user-transform-etl-{}'.format(id),
    'TrainingJobName': 'train-{}'.format(id),
    'ModelName': 'recommender-model-{}'.format(id),
    'EndpointName': endpoint_name,
    'BatchPredJobName': 'recommender-batch-transform-{}'.format(id),
    'GlueBatchJobName': glue_job_name,
    'S3ModelPath': 's3://{}/{}/data/model/train-{}/output'.format(bucket, project_name, id),
    'S3PreprocessedPath': 's3://{}/{}/data/train/preprocessed-{}'.format(bucket, project_name, id),
    'S3RecommendationsPath': 's3://{}/{}/data/output/recommendations-{}'.format(bucket, project_name, id),
    'DoPreprocessing':True,
    'DoTraining':True,
    'DoBatchRecommend':True,
    'CreateNewEndpoint': True
    }
)

Render workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress). This generates a snapshot of the current state of your workflow as it executes. This is a static image therefore you must run the cell again to check progress:

In [None]:
execution.render_progress()

## Clean Up
When you are done, make sure to clean up your AWS account by deleting resources you won't be reusing. Uncomment the code below and run the cell to delete the Lambda functions, the Glue Job and the Step Function.

In [None]:
#lambda_client.delete_function(FunctionName=processing_function_name)
#lambda_client.delete_function(FunctionName=create_preprocessing_function_name)
#lambda_client.delete_function(FunctionName=create_batch_pred_function_name)
#glue_client.delete_job(JobName=glue_job_name)
#e2e_workflow.delete()