# Operationalize end-to-end Amazon Personalize model deployment process using AWS Step Functions Data Science SDK

1. [Introduction](#Introduction)
2. [Setup](#Setup)
3. [Step-01 Create Dataset Group](#Create-Dataset-Group)
4. [Step-02 Create Dataset Import Job](#Create-Dataset-Import-Job)
5. [Step-03 Select Recipe and Create Solution](#Create-Recipe-Solution)
6. [Step-04 Create Campaign](#Create-Campaign)
7. [Workflow](#Workflow)
8. [Generate-Recommendations](#Generate-Recommendations)

## Introduction

This notebook describes using the AWS Step Functions Data Science SDK to create and manage an Amazon Personalize workflow. The Step Functions SDK is an open source library that allows data scientists to easily create and execute machine learning workflows using AWS Step Functions. For more information on Step Functions SDK, see the following.
* [AWS Step Functions](https://aws.amazon.com/step-functions/)
* [AWS Step Functions Developer Guide](https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html)
* [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io)

In this notebook we will use the SDK to create steps to create Personalize resources, link them together to create a workflow, and execute the workflow in AWS Step Functions. 

For more information, on Amazon Personalize see the following.

* [Amazon Personalize](https://aws.amazon.com/personalize/)


## Setup

### Import required modules from the SDK

In [None]:
!pip install --upgrade stepfunctions

In [None]:
import boto3
import json
import numpy as np
import pandas as pd
import time

personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')


import stepfunctions
import logging

from stepfunctions.inputs import *
from stepfunctions.steps import *
from stepfunctions.workflow import Workflow

stepfunctions.set_stream_logger(level=logging.INFO)

In [None]:
AWS_REGION_NAME = "us-east-1" # replace with your region name

### Setup IAM Roles

#### Create an execution role for Step Functions

You need an execution role so that you can create and execute workflows in Step Functions.

1. Go to the [IAM console](https://console.aws.amazon.com/iam/)
2. Select **Roles** and then **Create role**.
3. Under **Choose the service that will use this role** select **Step Functions**
4. Choose **Next** until you can enter a **Role name**
5. Enter a name such as `StepFunctionsWorkflowExecutionRole` and then select **Create role**


Attach a policy to the role you created. The following steps attach a policy that provides full access to Step Functions, however as a good practice you should only provide access to the resources you need.  

1. Under the **Permissions** tab, click **Add inline policy**
2. Enter the following in the **JSON** tab

```json
{
    "Version": "2012-10-17",
    "Statement": [
    
        {
            "Effect": "Allow",
            "Action": [
                "personalize:*"
            ],
            "Resource": "*"
        },   

        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": "*",
        },
        {
            "Effect": "Allow",
            "Action": [
                "events:PutTargets",
                "events:PutRule",
                "events:DescribeRule"
            ],
            "Resource": "*"
        }
    ]
}
```

3. Choose **Review policy** and give the policy a name such as `StepFunctionsWorkflowExecutionPolicy`
4. Choose **Create policy**. You will be redirected to the details page for the role.
5. Copy the **Role ARN** at the top of the **Summary**



In [None]:
workflow_execution_role = "arn:aws:iam::[[acount_id]]:role/StepFunctionsWorkflowExecutionRole" # paste the StepFunctionsWorkflowExecutionRole ARN from above

### Setup S3 location and filename
create an Amazon S3 bucket to store the training dataset and provide the Amazon S3 bucket name and file name in the walkthrough notebook  step Setup S3 location and filename below:

In [None]:
bucket = "personalize-demo-use1"  # replace with the name of your S3 bucket
filename = "movie-lens-100k.csv"  # replace with a name that you want to save the dataset under

#### Attach Policy to S3 Bucket

In [None]:
s3 = boto3.client("s3", region_name=AWS_REGION_NAME)

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
                
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role


#### Create Personalize Role


In [None]:
iam = boto3.client("iam", region_name=AWS_REGION_NAME)

role_name = "personalize-role" # Create a personalize role

assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)


policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
role_arn

## Data-Preparation

### Download, Prepare, and Upload Training Data

In [None]:
!pwd

In [None]:
!wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip

In [None]:
data = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP'])
pd.set_option('display.max_rows', 5)
data

In [None]:
data = data[data['RATING'] > 2]   # keep only movies rated 2 and above
interactions_data = data[['USER_ID', 'ITEM_ID', 'TIMESTAMP']] 
interactions_data.to_csv(filename, index=False)

boto3.Session().resource('s3').Bucket(bucket).Object('interactions/{}'.format(filename)).upload_file(filename)

## Task-States

### Lambda Task state

A `Task` State in Step Functions represents a single unit of work performed by a workflow. Tasks can call Lambda functions and orchestrate other AWS services. See [AWS Service Integrations](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-service-integrations.html) in the *AWS Step Functions Developer Guide*.

The following creates a [LambdaStep](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/compute.html#stepfunctions.steps.compute.LambdaStep) called `lambda_state`, and then configures the options to [Retry](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-error-handling.html#error-handling-retrying-after-an-error) if the Lambda function fails.

#### Create a Lambda functions

The Lambda task states in this workflow uses Lambda function **(Python 3.x)** that returns a Personalize resources such as Schema, Datasetgroup, Dataset, Solution, SolutionVersion, etc. Create the following functions in the [Lambda console](https://console.aws.amazon.com/lambda/).

1. stepfunction-create-schema
2. stepfunctioncreatedatagroup
3. stepfunctioncreatedataset
4. stepfunction-createdatasetimportjob
5. stepfunction_select-recipe_create-solution
6. stepfunction_create_solution_version
7. stepfunction_getsolution_metric_create_campaign

Copy/Paste the corresponding lambda function code from ./Lambda/ folder in the repo


#### INFO
For eacth Lambda functions, attach **AmazonPersonalizeFullAccess** IAM Policy because Lambda functions have to access Personalize service.

#### <a name="Create-Dataset-Group"></a>Step-01 Create Dataset Group

![](./assets/step-01-create-datasets-group.png)

#### Create Schema

Before you add a dataset to Amazon Personalize, you must define a schema for that dataset. Once you define the schema and create the dataset, you can't make changes to the schema.for more information refer this documentation.

In [None]:
lambda_state_schema = LambdaStep(
    state_id="create schema",
    parameters={  
        "FunctionName": "stepfunction-create-schema", # replace with the name of the function you created
        "Payload": {  
           "input": "personalize-stepfunction-schema" # replace with your own schema name if you want
        }
    },
    result_path='$'
)

lambda_state_schema.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_schema.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateSchemaTaskFailed")
))

## Wait-States

#### A `Wait` state in Step Functions waits a specific amount of time. See [Wait](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Wait) in the AWS Step Functions Data Science SDK documentation.

#### Wait for Schema to be ready

In [None]:
wait_state_schema = Wait(
    state_id="Wait for create schema - 5 secs",
    seconds=5
)

#### Create Datasetgroup

Craete Datasetgroup: Creates an empty dataset group. A dataset group contains related datasets that supply data for training a model. A dataset group can contain at most three datasets, one for each type of dataset:
•	Interactions
•	Items
•	Users
To train a model (create a solution), a dataset group that contains an Interactions dataset is required. Call CreateDataset to add a dataset to the group.

After you have created a schema , we will create another Stepfunction state based on this lambda function stepfunctioncreatedatagroup.py  below in github lambdas folder by running the Create Datasetgroup¶ step of the notebook. We are using python boto3 APIs to create_dataset_group.

In [None]:
lambda_state_datasetgroup = LambdaStep(
    state_id="create dataset Group",
    parameters={  
        "FunctionName": "stepfunctioncreatedatagroup", #replace with the name of the function you created
        "Payload": {  
           "input": "personalize-stepfunction-dataset-group", 
           "schemaArn.$": '$.Payload.schemaArn'
        }
    },

    result_path='$'
)

lambda_state_datasetgroup.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_datasetgroup.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateDataSetGroupTaskFailed")
))

#### Wait for Datasetgroup to be ready

In [None]:
wait_state_datasetgroup = Wait(
    state_id="Wait for create datasetgroup - 30 secs",
    seconds=30
)



### Check status of the lambda task and take action accordingly

#### If a state fails, move it to `Fail` state. See [Fail](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation.

#### Create a Lambda functions for checking status of the lambda task

In order to check status of the lambda task, create the following Lambda functions **(Python 3.x)** in the [Lambda console](https://console.aws.amazon.com/lambda/).

- stepfunction_waitforDatasetGroup

Copy/Paste the corresponding lambda function code from ./Lambda/ folder in the repo

#### INFO
For eacth Lambda functions, attach **AmazonPersonalizeFullAccess** IAM Policy because Lambda functions have to access Personalize service.

### check datasetgroup status

In [None]:
lambda_state_datasetgroupstatus = LambdaStep(
    state_id="check dataset Group status",
    parameters={  
        "FunctionName": "stepfunction_waitforDatasetGroup", #replace with the name of the function you created
        "Payload": {  
           "input.$": '$.Payload.datasetGroupArn',
           "schemaArn.$": '$.Payload.schemaArn'
        }
    },
    result_path = '$'
)

lambda_state_datasetgroupstatus.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_datasetgroupstatus.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetGroupStatusTaskFailed")
))

#### After chaining together the steps for the workflow path, we will define and visualize the workflow.

### Define Workflow

In the following cell, you will define the step that you will use in our workflow.  Then you will create, visualize and execute the workflow. 

Steps relate to states in AWS Step Functions. For more information, see [States](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html) in the *AWS Step Functions Developer Guide*. For more information on the AWS Step Functions Data Science SDK APIs, see: https://aws-step-functions-data-science-sdk.readthedocs.io. 




### Dataset workflow

In [None]:
DatasetGroup_workflow_definition = Chain([lambda_state_schema,
                                       wait_state_schema,
                                       lambda_state_datasetgroup,
                                       wait_state_datasetgroup,
                                       lambda_state_datasetgroupstatus])

In [None]:
DatasetGroup_workflow = Workflow(
    name="Dataset-workflow",
    definition=DatasetGroup_workflow_definition,
    role=workflow_execution_role
)

#### DatasetGroup workflow graph

![](./assets/dataset-group-workflow-graph.png)

In [None]:
DatasetGroup_workflow.render_graph()

In [None]:
DatasetGroupWorkflowArn = DatasetGroup_workflow.create()
DatasetGroupWorkflowArn

In [None]:
DatasetGroup_workflow_execution = DatasetGroup_workflow.execute()
DatasetGroup_workflow_execution

#### <a name="Create-Dataset-Import-Job"></a>Step-02 Create Dataset Import Job

![](./assets/step-02-create-dataset-import-job.png)

#### Create Dataset

Creates an empty dataset and adds it to the specified dataset group. Use CreateDatasetImportJob to import your training data to a dataset.

There are three types of datasets:

- Interactions
- Items
- Users

Each dataset type has an associated schema with required field types. Only the Interactions dataset is required in order to train a model (also referred to as creating a solution).

In [None]:
create_dataset_execution_input = ExecutionInput(schema={
    'name': str,
    'datasetType': str,
    'schemaArn': str,
    'datasetGroupArn': str
})

In [None]:
lambda_state_createdataset = LambdaStep(
    state_id="create dataset",
    parameters={  
        "FunctionName": "stepfunctioncreatedataset", #replace with the name of the function you created
        "Payload": {
            "name": create_dataset_execution_input["datasetType"],
            "datasetType": create_dataset_execution_input["datasetType"],
            "schemaArn": create_dataset_execution_input["schemaArn"],
            "datasetGroupArn": create_dataset_execution_input["datasetGroupArn"],
            "datasetType.$": '$.datasetType',
            "schemaArn.$": '$.schemaArn',
            "datasetGroupArn.$": '$.datasetGroupArn',        
        }
    },
    result_path = '$'
)

lambda_state_createdataset.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_createdataset.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateDataSetTaskFailed")
))

#### Wait for Dataset to be ready

In [None]:
wait_state_dataset = Wait(
    state_id="wait for create dataset - 30 secs",
    seconds=30
)

#### Create Dataset Import Job

When you have completed Step 1: Creating a Dataset Group and Step 2: Creating a Dataset and a Schema, you are ready to import your training data into Amazon Personalize. When you import data, you can choose to import records in bulk, import records individually, or both, depending on your business requirements and the amount of historical data you have collected. If you have a large amount of historical records, 
we recommend you first import data in bulk and then add data incrementally as necessary.

In [None]:
lambda_state_datasetimportjob = LambdaStep(
    state_id="create dataset import job",
    parameters={  
        "FunctionName": "stepfunction-createdatasetimportjob", #replace with the name of the function you created
        "Payload": {  
           "datasetimportjob": "stepfunction-createdatasetimportjob",
           "datasetArn.$": '$.Payload.datasetArn',
           "datasetGroupArn.$": '$.Payload.datasetGroupArn',
           "bucket_name": bucket,
           "file_name": 'interactions/{}'.format(filename),
           "role_arn": role_arn
        }
    },

    result_path = '$'
)

lambda_state_datasetimportjob.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_datasetimportjob.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetImportJobTaskFailed")
))

#### Wait for Dataset Import Job to be ACTIVE

In [None]:
wait_state_datasetimportjob = Wait(
    state_id="Wait for create datasetimportjob - 30 secs",
    seconds=30
)



### Check status of the lambda task and take action accordingly

#### If a state fails, move it to `Fail` state. See [Fail](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation.

#### Create a Lambda functions for checking status of the lambda task

In order to check status of the lambda task, create the following Lambda functions **(Python 3.x)** in the [Lambda console](https://console.aws.amazon.com/lambda/).

- stepfunction_waitfordatasetimportjob

Copy/Paste the corresponding lambda function code from ./Lambda/ folder in the repo

#### INFO
For eacth Lambda functions, attach **AmazonPersonalizeFullAccess** IAM Policy because Lambda functions have to access Personalize service.

### check dataset import job status

In [None]:
lambda_state_datasetimportjob_status = LambdaStep(
    state_id="check dataset import job status",
    parameters={  
        "FunctionName": "stepfunction_waitfordatasetimportjob", #replace with the name of the function you created
        "Payload": {  
           "datasetImportJobArn.$": '$.Payload.datasetImportJobArn',
           "datasetGroupArn.$": '$.Payload.datasetGroupArn'
        }
    },
    result_path = '$'
)

lambda_state_datasetimportjob_status.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_datasetimportjob_status.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetImportJobStatusTaskFailed")
))

## Choice-States

Now, attach branches to the Choice state you created earlier. See *Choice Rules* in the [AWS Step Functions Data Science SDK documentation](https://aws-step-functions-data-science-sdk.readthedocs.io) .

#### Chain together steps for the define the workflow path

The following cell links together the steps you've created above into a sequential group. The new path sequentially includes the Lambda state, Wait state, and the Succeed state that you created earlier.

In [None]:
datasetimportjob_choice_state = Choice(
    state_id="Is the DataSet Import Job ready?"
)

In [None]:
datasetimportjob_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='ACTIVE'),
    next_step=Succeed("The Solution Version ready?")   
)

datasetimportjob_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='CREATE PENDING'),
    next_step=wait_state_datasetimportjob
)

datasetimportjob_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_datasetimportjob_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
    next_step=wait_state_datasetimportjob
)

datasetimportjob_choice_state.default_choice(next_step=Fail("dataset_import_job_failed"))

#### After chaining together the steps for the workflow path, we will define and visualize the workflow.

### DatasetImportWorkflow

In [None]:
DatasetImport_workflow_definition=Chain([lambda_state_createdataset,
                                   wait_state_dataset,
                                   lambda_state_datasetimportjob,
                                   wait_state_datasetimportjob,
                                   lambda_state_datasetimportjob_status,
                                   datasetimportjob_choice_state
                                  ])

In [None]:
DatasetImport_workflow = Workflow(
    name="DatasetImport-workflow",
    definition=DatasetImport_workflow_definition,
    role=workflow_execution_role,
    execution_input=create_dataset_execution_input
)

#### DatasetImport workflow graph

![](./assets/dataset-import-workflow-graph.png)

In [None]:
DatasetImport_workflow.render_graph()

In [None]:
DatasetImportflowArn = DatasetImport_workflow.create()
DatasetImportflowArn

In [None]:
DatasetImport_workflow.update(definition=DatasetImport_workflow_definition)

In [None]:
DatasetImport_workflow_execution = DatasetImport_workflow.execute(inputs={
    "name": "personalize-stepfunction-dataset",
    "datasetType": "INTERACTIONS",
    "datasetGroupArn": "arn:aws:personalize:us-east-1:123456789012:dataset-group/personalize-stepfunction-dataset-group",
    "schemaArn": "arn:aws:personalize:us-east-1:123456789012:schema/personalize-stepfunction-schema"
})
DatasetImport_workflow_execution

#### Create Solution

Once you have finished Preparing and Importing Data, you are ready to create a Solution. A Solution refers to the combination of an Amazon Personalize recipe, customized parameters, and one or more solution versions (trained models). Once you create a solution with a solution version, you can create a campaign to deploy the solution version and get recommendations.

To create a solution in Amazon Personalize, you do the following:

Choose a recipe – A recipe is an Amazon Personalize term specifying an appropriate algorithm to train for a given use case. See Step 1: Choosing a Recipe.

Configure a solution – Customize solution parameters and recipe-specific hyperparameters so the model meets your specific business needs. See Step 2: Configuring a Solution.

Create a solution version (train a model) – Train the machine learning model Amazon Personalize will use to generate recommendations for your customers. See Step 3: Creating a Solution Version.

Evaluate the solution version – Use the metrics Amazon Personalize generates from the new solution version to evaluate the performance of the model. See Step 4: Evaluating the Solution Version.


#### <a name="Create-Recipe-Solution"></a>Step-03 Select Recipe and Create Solution

![](./assets/step-03-choose-recipe-and-create-solution.png)

#### Choosing a Recipe and Configuring a Solution

A recipe is an Amazon Personalize term specifying an appropriate algorithm to train for a given use case. 

In [None]:
select_receipe_create_solution_execution_input = ExecutionInput(schema={
    'solution_name': str,
    'recipe': str,
    'datasetGroupArn': str
})

In [None]:
lambda_state_select_receipe_create_solution = LambdaStep(
    state_id="select receipe and create solution",
    parameters={  
        "FunctionName": "stepfunction_select-recipe_create-solution", #replace with the name of the function you created
        "Payload": {
            'solution_name': select_receipe_create_solution_execution_input['solution_name'],
            "recipe": select_receipe_create_solution_execution_input['recipe'],
            "datasetGroupArn": select_receipe_create_solution_execution_input['datasetGroupArn'],
            "datasetGroupArn.$": '$.datasetGroupArn'
        }
    },
    result_path = '$'
)

lambda_state_select_receipe_create_solution.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_select_receipe_create_solution.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("DatasetReceiptCreateSolutionTaskFailed")
))

#### Wait for Receipe to ready

In [None]:
wait_state_receipe = Wait(
    state_id="Wait for receipe - 30 secs",
    seconds=30
)

#### Create Solution Version

Once you have completed Choosing a Recipe and Configuring a Solution, you are ready to create a Solution Version. A Solution Version refers to a trained machine learning model you can deploy to get recommendations for customers. You can create a solution version using the console, AWS Command Line Interface (AWS CLI), or AWS SDK.

In [None]:
lambda_create_solution_version = LambdaStep(
    state_id="create solution version",
    parameters={
        "FunctionName": "stepfunction_create_solution_version", 
        "Payload": {  
           "solutionArn.$": '$.Payload.solutionArn'           
        }
    },
    result_path = '$'
)

lambda_create_solution_version.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_create_solution_version.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateSolutionVersionTaskFailed")
))

#### Wait for Solution Version to be ACTIVE

In [None]:
wait_state_solutionversion = Wait(
    state_id="Wait for solution version - 60 secs",
    seconds=60
)



### Check status of the lambda task and take action accordingly

#### If a state fails, move it to `Fail` state. See [Fail](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation.

#### Create a Lambda functions for checking status of the lambda task

In order to check status of the lambda task, create the following Lambda functions **(Python 3.x)** in the [Lambda console](https://console.aws.amazon.com/lambda/).

- stepfunction_waitforSolutionVersion

Copy/Paste the corresponding lambda function code from ./Lambda/ folder in the repo

#### INFO
For eacth Lambda functions, attach **AmazonPersonalizeFullAccess** IAM Policy because Lambda functions have to access Personalize service.

### check solution version status

In [None]:
lambda_state_solutionversion_status = LambdaStep(
    state_id="check solution version status",
    parameters={  
        "FunctionName": "stepfunction_waitforSolutionVersion", #replace with the name of the function you created
        "Payload": {  
           "solutionVersionArn.$": '$.Payload.solutionVersionArn'           
        }
    },
    result_path = '$'
)

lambda_state_solutionversion_status.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_solutionversion_status.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("SolutionVersionStatusTaskFailed")
))

## Choice-States

Now, attach branches to the Choice state you created earlier. See *Choice Rules* in the [AWS Step Functions Data Science SDK documentation](https://aws-step-functions-data-science-sdk.readthedocs.io) .

In [None]:
solutionversion_choice_state = Choice(
    state_id="Is the Solution Version ready?"
)

In [None]:
solutionversion_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='ACTIVE'),
    next_step=Succeed("The Solution Version ready?")   
)

solutionversion_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='CREATE PENDING'),
    next_step=wait_state_solutionversion
)

solutionversion_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_solutionversion_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
    next_step=wait_state_solutionversion
)

solutionversion_choice_state.default_choice(next_step=Fail("CreateSolutionVersionFailed"))

#### Recepie and Solution workflow

In [None]:
Create_receipe_sol_workflow_definition = Chain([lambda_state_select_receipe_create_solution,
                                   wait_state_receipe,
                                   lambda_create_solution_version,
                                   wait_state_solutionversion,
                                   lambda_state_solutionversion_status,
                                   solutionversion_choice_state
                                  ])

In [None]:
Create_receipe_sol_workflow = Workflow(
    name="Create_receipe_sol-workflow",
    definition=Create_receipe_sol_workflow_definition,
    role=workflow_execution_role
)

#### Recipe & Solution workflow graph

![](./assets/recipe-solution-workflow-graph.png)

In [None]:
Create_receipe_sol_workflow.render_graph()

In [None]:
CreateReceipeArn = Create_receipe_sol_workflow.create()
CreateReceipeArn

In [None]:
Create_receipe_sol_workflow.update(definition=Create_receipe_sol_workflow_definition)

In [None]:
Create_receipe_sol_workflow.execute(inputs={
    "solution_name": "stepfunction-solution", #replace with your solution name
    "recipe": "aws-user-personalization", #replace with your recipe
    # replace with datasetGroupArn created in the previous step
    "datasetGroupArn": "arn:aws:personalize:us-east-1:123456789012:dataset-group/personalize-stepfunction-dataset-group"
})

#### Create Campaign

A campaign is used to make recommendations for your users. You create a campaign by deploying a solution version

#### <a name="Create-Campaign"></a>Step-04 Create Campaign

![](./assets/step-04-create-campaign.png)

In [None]:
create_campaign_execution_input = ExecutionInput(schema={
    'campaign_name': str,
    'solutionVersionArn': str,
})

In [None]:
lambda_create_campaign = LambdaStep(
    state_id="create campaign",
    parameters={  
        "FunctionName": "stepfunction_getsolution_metric_create_campaign", 
        "Payload": {
            "campaign_name": create_campaign_execution_input['campaign_name'],
            "solutionVersionArn": create_campaign_execution_input['solutionVersionArn'],
            "solutionVersionArn.$": '$.solutionVersionArn'
        }
    },
    result_path = '$'
)

lambda_create_campaign.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_create_campaign.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CreateCampaignTaskFailed")
))

#### Wait for Campaign to be ACTIVE

In [None]:
wait_state_campaign = Wait(
    state_id="Wait for Campaign - 30 secs",
    seconds=30
)



### Check status of the lambda task and take action accordingly

#### If a state fails, move it to `Fail` state. See [Fail](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/states.html#stepfunctions.steps.states.Fail) in the AWS Step Functions Data Science SDK documentation.

#### Create a Lambda functions for checking status of the lambda task

In order to check status of the lambda task, create the following Lambda functions **(Python 3.x)** in the [Lambda console](https://console.aws.amazon.com/lambda/).

- stepfunction_waitforCampaign

Copy/Paste the corresponding lambda function code from ./Lambda/ folder in the repo

#### INFO
For eacth Lambda functions, attach **AmazonPersonalizeFullAccess** IAM Policy because Lambda functions have to access Personalize service.

### check campaign status

In [None]:
lambda_state_campaign_status = LambdaStep(
    state_id="check campaign status",
    parameters={  
        "FunctionName": "stepfunction_waitforCampaign", #replace with the name of the function you created
        "Payload": {  
           "campaignArn.$": '$.Payload.campaignArn'           
        }
    },
    result_path = '$'
)

lambda_state_campaign_status.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_campaign_status.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("CampaignStatusTaskFailed")
))

## Choice-States

Now, attach branches to the Choice state you created earlier. See *Choice Rules* in the [AWS Step Functions Data Science SDK documentation](https://aws-step-functions-data-science-sdk.readthedocs.io) .

In [None]:
create_campaign_choice_state = Choice(
    state_id="Is the Campaign ready?"
)

In [None]:
create_campaign_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='ACTIVE'),
    next_step=Succeed("CampaignCreatedSuccessfully")     
)

create_campaign_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='CREATE PENDING'),
    next_step=wait_state_campaign
)

create_campaign_choice_state.add_choice(
    rule=ChoiceRule.StringEquals(variable=lambda_state_campaign_status.output()['Payload']['status'], value='CREATE IN_PROGRESS'),
    next_step=wait_state_campaign
)

create_campaign_choice_state.default_choice(next_step=Fail("CreateCampaignFailed"))

#### Chain together steps for the define the workflow path

The following cell links together the steps you've created above into a sequential group. The new path sequentially includes the Lambda state, Wait state, and the Succeed state that you created earlier.

#### After chaining together the steps for the workflow path, we will define and visualize the workflow.

#### Create Campaign Workflow

In [None]:
Create_Campaign_workflow_definition = Chain([lambda_create_campaign,
                                   wait_state_campaign,
                                   lambda_state_campaign_status,
                                   wait_state_datasetimportjob,
                                   create_campaign_choice_state
                                  ])

In [None]:
Campaign_workflow = Workflow(
    name="Campaign-workflow",
    definition=Create_Campaign_workflow_definition,
    role=workflow_execution_role
)

#### Campaign workflow graph
![](./assets/campaign-workflow-graph.png)

In [None]:
Campaign_workflow.render_graph()

In [None]:
CreateCampaignArn = Campaign_workflow.create()
CreateCampaignArn

In [None]:
Campaign_workflow.update(definition=Create_Campaign_workflow_definition)

In [None]:
 Campaign_workflow.execute(inputs={
    'campaign_name': "stepfunction-campaign", # replace with your campaign name
    # replace with the solutionVersionArn created in previous step
    'solutionVersionArn': "arn:aws:personalize:us-east-1:123456789012:solution/stepfunction-solution/06d2a809"
})

## Workflow

### Define Workflow

In the following cell, you will define the step that you will use in our workflow.  Then you will create, visualize and execute the workflow. 

Steps relate to states in AWS Step Functions. For more information, see [States](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-states.html) in the *AWS Step Functions Developer Guide*. For more information on the AWS Step Functions Data Science SDK APIs, see: https://aws-step-functions-data-science-sdk.readthedocs.io. 




#### Main workflow

![](./assets/personalization-workflow.png)

In [None]:
call_dataset_workflow_state = Task(
    state_id="DataSetWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
        "Input": "true",
        "StateMachineArn": DatasetGroupWorkflowArn
    }
)

In [None]:
call_datasetImport_workflow_state = Task(
    state_id="DataSetImportWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
        "Input":{
            "datasetType": "INTERACTIONS",
            "schemaArn.$": "$.Output.Payload.schemaArn",
            "datasetGroupArn.$": "$.Output.Payload.datasetGroupArn"
        },
        "StateMachineArn": DatasetImportflowArn,
    }
)

In [None]:
call_receipe_solution_workflow_state = Task(
    state_id="ReceipeSolutionWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
        "Input":{
            "solution_name": "stepfunction-solution",
            "recipe": "aws-user-personalization",
            "datasetGroupArn.$": "$.Output.Payload.datasetGroupArn"
        },
        "StateMachineArn": CreateReceipeArn
    }
)

In [None]:
call_campaign_solution_workflow_state = Task(
    state_id="CampaignWorkflow",
    resource="arn:aws:states:::states:startExecution.sync:2",
    parameters={
        "Input":{
            "campaign_name": "stepfunction-campaign",
            "solutionVersionArn.$": "$.Output.Payload.solutionVersionArn"
        },
        "StateMachineArn": CreateCampaignArn
    }
)

In [None]:
Main_workflow_definition=Chain([call_dataset_workflow_state,
                                call_datasetImport_workflow_state,
                                call_receipe_solution_workflow_state,
                                call_campaign_solution_workflow_state
                               ])

In [None]:
Main_workflow = Workflow(
    name="UserPersionalization-workflow",
    definition=Main_workflow_definition,
    role=workflow_execution_role
)

#### Main workflow graph
![](./assets/main-workflow-graph.png)

In [None]:
Main_workflow.render_graph()

### Create and execute the workflow

In the next cells, we will create the branching happy workflow in AWS Step Functions with [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create) and execute it with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute).


In [None]:
MainWorkflowArn = Main_workflow.create()
MainWorkflowArn

#### Update IAM Policy of StepFunctionsWorkflowExecutionRole

For a state machine that calls **StartExecution** for a single nested workflow execution, add `StepFunctionsWorkflowExecutionRole` IAM policy that limits permissions to that state machine.

For example,
```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "states:StartExecution"
            ],
            "Resource": [
                "arn:aws:states:[[region]]:[[accountId]]:stateMachine:[[stateMachineName]]"
            ]
        }
    ]
}
```

In [None]:
iam = boto3.client("iam", region_name=AWS_REGION_NAME)

role_name = "StepFunctionsWorkflowExecutionRole"

inline_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "states:StartExecution"
            ],
            "Resource": [
                MainWorkflowArn,
                DatasetWorkflowArn,
                DatasetImportWorkflowArn,
                CreateCampaignWorkflowArn,
                CreateReceipeSolutionWorkflowArn
            ]
        }
    ]
}

put_role_policy_response = iam.put_role_policy(
    RoleName=role_name,
    PolicyName='StepFunctionsStateStartExecutionPolicy',
    PolicyDocument=json.dumps(inline_policy_document)
)

time.sleep(60)

In [None]:
Main_workflow_execution = Main_workflow.execute()

Main_workflow_execution = Workflow(
    name="Campaign_Workflow",
    definition=path1,
    role=workflow_execution_role
)


###  Review the workflow progress

Review the workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress).

Review the execution history by calling [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution.

In [None]:
Main_workflow_execution.render_progress()

In [None]:
Main_workflow_execution.list_events(html=True)

In [None]:
wf_execution_output = Main_workflow_execution.get_output()
campaign_arn = wf_execution_output['Output']['Payload']['campaign_arn']
campaign_arn

## Generate-Recommendations

### Now that we have a successful campaign, let's generate recommendations for the campaign

#### Select a User and an Item

In [None]:
items = pd.read_csv('./ml-100k/u.item', sep='|', usecols=[0,1], encoding='latin-1')
items.columns = ['ITEM_ID', 'TITLE']

user_id, item_id, rating, timestamp = data.sample().values[0]

user_id = int(user_id)
item_id = int(item_id)

print("user_id",user_id)
print("items",items)


item_title = items.loc[items['ITEM_ID'] == item_id].values[0][-1]
print("USER: {}".format(user_id))
print("ITEM: {}".format(item_title))
print("ITEM ID: {}".format(item_id))

In [None]:
wait_recommendations = Wait(
    state_id="Wait for recommendations - 10 secs",
    seconds=10
)

#### Lambda Task

create `stepfunction_getRecommendations` Lambda functions **(Python 3.x)** in the [Lambda console](https://console.aws.amazon.com/lambda/).


Copy/Paste the corresponding lambda function code from ./Lambda/ folder in the repo

In [None]:
lambda_state_get_recommendations = LambdaStep(
    state_id="get recommendations",
    parameters={  
        "FunctionName": "stepfunction_getRecommendations", 
        "Payload": {  
           "campaign_arn": campaign_arn,            
           "user_id": user_id,  
           "item_id": item_id             
        }
    },
    result_path = '$'
)

lambda_state_get_recommendations.add_retry(Retry(
    error_equals=["States.TaskFailed"],
    interval_seconds=5,
    max_attempts=1,
    backoff_rate=4.0
))

lambda_state_get_recommendations.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=Fail("GetRecommendationTaskFailed")
))

#### Create a Succeed State

In [None]:
get_recommendations_workflow_complete = Succeed("WorkflowComplete")

In [None]:
recommendation_path = Chain([ 
    lambda_state_get_recommendations,
    wait_recommendations,
    get_recommendations_workflow_complete
])

### Define, Create, Render, and Execute Recommendation Workflow

In the next cells, we will create a workflow in AWS Step Functions with [create](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.create) and execute it with [execute](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Workflow.execute).

In [None]:
recommendation_workflow = Workflow(
    name="Recommendation_Workflow4",
    definition=recommendation_path,
    role=workflow_execution_role
)

In [None]:
recommendation_workflow.render_graph()

In [None]:
recommendation_workflow_arn = recommendation_workflow.create()
recommendation_workflow_arn

In [None]:
recommendation_workflow_execution = recommendation_workflow.execute()

### Review progress

Review workflow progress with the [render_progress](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.render_progress).

Review execution history by calling [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution.

In [None]:
recommendation_workflow_execution.render_progress()

In [None]:
recommendation_workflow_execution.list_events(html=True)

In [None]:
item_list = recommendation_workflow_execution.get_output()['Payload']['item_list']

### Get Recommendations

In [None]:
item_list = recommendation_workflow_execution.get_output()['Payload']['item_list']

print("Recommendations:")
for item in item_list:
    item_title = items.loc[items['ITEM_ID'] == np.int(item['itemId'])].values[0][-1]
    print(item_title)

## Clean up Amazon Personalize resources

Make sure to clean up the Amazon Personalize and the state machines created blog. Login to Amazon Personalize console and delete resources such as Dataset Groups, Dataset, Solutions, Receipts, and Campaign. 

## Clean up State Machine resources

In [None]:
Campaign_workflow.delete()

recommendation_workflow.delete()

Main_workflow.delete()

Create_receipe_sol_workflow.delete()

DatasetImport_workflow.delete()

Dataset_workflow.delete()