# Building Product Recommendation Engine with Amazon Personalize

In this lab, we would like to use Amazon.com’s customer rating data to build product recommendation plugin for our website. We will use Amazon Personalize to train the recommender model and to host the recommendation inference. In addition, we will test out the inference and display the items that user rated and items that are recommended for that user.

## Download and prepare sample dataset

In [None]:
!curl -o ./metadata.json.gz http://snap.stanford.edu/data/amazon/productGraph/metadata.json.gz
!curl -o ./ratings.json.gz http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Sports_and_Outdoors_5.json.gz

Data by:    
R. He, J. McAuley. Modeling the visual evolution of fashion trends with one-class collaborative filtering. WWW, 2016
J. McAuley, C. Targett, J. Shi, A. van den Hengel. Image-based recommendations on styles and substitutes. SIGIR, 2015

In [None]:
!gunzip ./ratings.json.gz
!gunzip ./metadata.json.gz

**Loading ratings into panda data frame**

In [None]:
import pandas as pd
ratings_df = pd.read_json("./ratings.json", lines=True)

**Include only important columns**

In [None]:
ratings_df = ratings_df[["reviewerID","asin","overall","unixReviewTime"]]

**This is how the ratings file look like (first 5 lines)**

In [None]:
ratings_df.head()

In [None]:
import boto3

personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')

## Specify a bucket and data output location

In [None]:
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = 'recommendation-engine-with-personalize'
filename = "clean_product_ratings.csv"

**Filter only users' ratings that are > 3, and reformat ratings file for Amazon Personalize input**

In [None]:
ratings_df = ratings_df[ratings_df['overall'] > 3]                # keep only movies rated 4 and above
ratings_df = ratings_df[['reviewerID', 'asin', 'unixReviewTime']]
ratings_df = ratings_df.rename({'reviewerID':'USER_ID','asin':'ITEM_ID','unixReviewTime':'TIMESTAMP'}, axis='columns')
ratings_df.to_csv(filename, index=False)

boto3.Session().resource('s3').Bucket(bucket).Object("{}/{}".format(prefix,filename)).upload_file(filename)



## Ingest data to Amazon Personalize

**Create user-interactions schema**

In [None]:
schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        }
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "user-products-interaction-schema",
    schema = json.dumps(schema)
)

schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))


**Create Dataset Group**

In [None]:
create_dataset_group_response = personalize.create_dataset_group(
    name = "products-rating-dataset-group"
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))

In [None]:
import time
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

**Create dataset**

In [None]:
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    name = "products-rating-user-products-interaction-dataset",
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = schema_arn
)

dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

**Attach Policy to S3 Bucket**

In [None]:
s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

**Create Personalize Role**

In [None]:
iam = boto3.client("iam")

role_name = "PersonalizeRoleForRecommendation"
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)

In [None]:
import json
role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Action": "s3:*",
          "Resource":"*"
        }
    ]
}
response = iam.create_policy(
    PolicyName='PersonalizeToS3',
    PolicyDocument=json.dumps(role_policy_document)
)

response = iam.attach_role_policy(
    RoleName=role_name, PolicyArn=response['Policy']['Arn'])

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

**Create dataset import job**

In [None]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "products-rating-user-products-interaction-dataset-import-job",
    datasetArn = dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}/{}".format(bucket, prefix, filename)
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

**Wait for Dataset Import Job to Have ACTIVE Status**

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    status = describe_dataset_import_job_response["datasetImportJob"]['status']
    print("DatasetImportJob: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)



## Train

**Select Recipe**

In [None]:
list_recipes_response = personalize.list_recipes()
recipe_arn = "arn:aws:personalize:::recipe/aws-hrnn"
list_recipes_response

**Create and Wait for Solution**

**Create Solution**

In [None]:
create_solution_response = personalize.create_solution(
    name = "products-rating-recommendation-solution",
    datasetGroupArn = dataset_group_arn,
    recipeArn = recipe_arn
)

solution_arn = create_solution_response['solutionArn']
print(json.dumps(create_solution_response, indent=2))

**Create Solution Version**

In [None]:
create_solution_version_response = personalize.create_solution_version(
    solutionArn = solution_arn
)

solution_version_arn = create_solution_version_response['solutionVersionArn']
print(json.dumps(create_solution_version_response, indent=2))

**Wait for Solution Version to Have ACTIVE Status**

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    print("SolutionVersion: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

**Get Metrics of Solution**

In [None]:
get_solution_metrics_response = personalize.get_solution_metrics(
    solutionVersionArn = solution_version_arn
)

print(json.dumps(get_solution_metrics_response, indent=2))

**Create and Wait for Campaign**

**Create Campaign**

In [None]:
create_campaign_response = personalize.create_campaign(
    name = "products-rating-campaign",
    solutionVersionArn = solution_version_arn,
    minProvisionedTPS = 1
)

campaign_arn = create_campaign_response['campaignArn']
print(json.dumps(create_campaign_response, indent=2))

**Wait for Campaign to Have ACTIVE Status**

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn
    )
    status = describe_campaign_response["campaign"]["status"]
    print("Campaign: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

## Prepare for inference

**Prepare method to enrich the items information with title and image URL**

In [None]:
import json
def encrich_with_metadata(products):
    client = boto3.client('s3')
    r = client.select_object_content(
        Bucket=bucket,
        Key="{}/clean_product_metadata.json".format(prefix),
        Expression="SELECT s.imUrl, s.asin, s.title FROM S3Object s WHERE s.asin IN {}".format(products),
        ExpressionType='SQL',
        RequestProgress={
            'Enabled': False
        },
        InputSerialization={
            'JSON': {
                'Type': 'LINES'
            }
        },
        OutputSerialization={
            'JSON': {
                'RecordDelimiter': '\n'
            },
        }
    )
    output = []
    for event in r['Payload']:
        if 'Records' in event:
            recs = event['Records']['Payload'].decode('utf-8').strip().split("\n")
            recs = list(map(lambda x: json.loads(x), recs))
            output += recs
    return output

**Get recommendation and enrich recommended items with title and images**

In [None]:
# Change user_id to test accordingly
user_id = "AIXZKN4ACSKI"

get_recommendations_response = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn,
    userId = str(user_id),
    numResults=15
)

recommended_item_list = list(map(lambda x: x['itemId'], get_recommendations_response['itemList']))
recommended_items = encrich_with_metadata(recommended_item_list)

**Get actual items that user reviewed with rating > 3 and enrich with title and image URL**

In [None]:
actual_item_list = list(ratings_df[ratings_df["USER_ID"] == user_id]['ITEM_ID'])
actual_items = encrich_with_metadata(actual_item_list)

**These are the actual items that user reviewed**

In [None]:
from IPython.display import HTML
image_string = ""
for item in actual_items:
    image_string += '<figure style="float:left;"><img src="{}" alt="Me" width="200"/><figcaption ><center>{}</center></figcaption></figure>'.format(item['imUrl'],item['title'])
HTML(data=image_string)

**These are the recommended items**

In [None]:
from IPython.display import HTML
image_string = ""
for item in recommended_items:
    image_string += '<figure style="float:left;"><img src="{}" alt="Me" width="200"/><figcaption ><center>{}</center></figcaption></figure>'.format(item['imUrl'],item['title'])
HTML(data=image_string)