# Building Product Recommendation Engine with Amazon Personalize

In this lab, we would like to use Amazon.com’s customer rating data to build product recommendation plugin for our website. We will use Amazon Personalize to train the recommender model and to host the recommendation inference. In addition, we will test out the inference and display the items that user rated and items that are recommended for that user.

## Download and prepare sample dataset

In [None]:
!curl -o ./metadata.json.gz http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_AMAZON_FASHION.json.gz
!curl -o ./ratings.json.gz http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/AMAZON_FASHION.json.gz

Data by:    
Justifying recommendations using distantly-labeled reviews and fined-grained aspects. Jianmo Ni, Jiacheng Li, Julian McAuley. Empirical Methods in Natural Language Processing (EMNLP), 2019.

In [None]:
!gunzip -f ratings.json.gz
!gunzip -f metadata.json.gz

**Loading ratings into panda data frame**

In [None]:
import pandas as pd 
ratings_df = pd.read_json("./ratings.json", lines=True)

In [None]:
ratings_df.head()

**Include only important columns**

In [None]:
ratings_df = ratings_df[["reviewerID","asin","overall","unixReviewTime"]]

In [None]:
ratings_df.head()

**This is how the ratings file look like (first 5 lines)**

In [None]:
import boto3

personalize = boto3.client('personalize')
personalize_runtime = boto3.client('personalize-runtime')

In [None]:
ratings_df = ratings_df.rename(columns={'reviewerID':'USER_ID', 'asin':'ITEM_ID', 'unixReviewTime':'TIMESTAMP', 'overall':'EVENT_VALUE'})

In [None]:
ratings_df['EVENT_TYPE'] = "reviewed"
ratings_df['EVENT_VALUE'] = ratings_df['EVENT_VALUE'].astype(float)

In [None]:
ratings_df.head()

## Specify a bucket and data output location

In [None]:
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = 'recommendation-engine-with-personalize-fashion'
filename = "clean_product_ratings.csv"

**Filter only users' ratings that are > 3, and reformat ratings file for Amazon Personalize input**

In [None]:
ratings_df.to_csv(filename, index=False)

boto3.Session().resource('s3').Bucket(bucket).Object("{}/{}".format(prefix,filename)).upload_file(filename)



## Ingest data to Amazon Personalize

**Specify naming and threshold**

In [None]:
base_name = "fashion"
iteration = "2"
review_star_threshold = 4.0

**Create user-interactions schema**

In [None]:
import json
schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        {
            "name": "TIMESTAMP",
            "type": "long"
        },
        {
            "name": "EVENT_TYPE",
            "type": "string"
        },
        {
            "name": "EVENT_VALUE",
            "type": "float"
        },
    ],
    "version": "1.0"
}

create_schema_response = personalize.create_schema(
    name = "{}-{}-interaction-schema".format(base_name, iteration),
    schema = json.dumps(schema)
)

schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))


**Create Dataset Group**

In [None]:
create_dataset_group_response = personalize.create_dataset_group(
    name = "{}-{}-dataset-group".format(base_name, iteration)
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
print(json.dumps(create_dataset_group_response, indent=2))

In [None]:
import time
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

**Create dataset**

In [None]:
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    name = "{}-{}-interaction-dataset".format(base_name, iteration),
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = schema_arn
)

dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

**Attach Policy to S3 Bucket**

In [None]:
s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy{}{}".format(base_name, iteration),
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

**Create Personalize Role**

In [None]:
iam = boto3.client("iam")

role_name = "PersonalizeRoleForRecommendation{}{}".format(base_name, iteration)
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# AmazonPersonalizeFullAccess provides access to any S3 bucket with a name that includes "personalize" or "Personalize" 
# if you would like to use a bucket with a different name, please consider creating and attaching a new policy
# that provides read access to your bucket or attaching the AmazonS3ReadOnlyAccess policy to the role
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)

In [None]:
import json
role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Action": "s3:*",
          "Resource":"*"
        }
    ]
}
response = iam.create_policy(
    PolicyName='PersonalizeToS3{}{}'.format(base_name, iteration),
    PolicyDocument=json.dumps(role_policy_document)
)

response = iam.attach_role_policy(
    RoleName=role_name, PolicyArn=response['Policy']['Arn'])

time.sleep(60) # wait for a minute to allow IAM role policy attachment to propagate

**Create dataset import job**

In [None]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "{}-{}-interaction-import-job".format(base_name, iteration),
    datasetArn = dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}/{}".format(bucket, prefix, filename)
    },
    roleArn = role_arn
)

dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

**Wait for Dataset Import Job to Have ACTIVE Status**

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = dataset_import_job_arn
    )
    status = describe_dataset_import_job_response["datasetImportJob"]['status']
    print("DatasetImportJob: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)



## Train

**Select Recipe**

In [None]:
list_recipes_response = personalize.list_recipes()
recipe_arn_up = "arn:aws:personalize:::recipe/aws-user-personalization"
recipe_arn_hrnn = "arn:aws:personalize:::recipe/aws-hrnn"

**Define Solution Configuration**

In [None]:

solution_config = {
        "eventValueThreshold": str(review_star_threshold),
        "featureTransformationParameters": {
            "min_user_history_length_percentile" : "0.0", # increase to remove popular item
            "max_user_history_length_percentile" : "0.99", # stay
        },
        "algorithmHyperParameters": {
            "hidden_dimension" : "159"
        },
        "hpoConfig": {
            "algorithmHyperParameterRanges": {
                'integerHyperParameterRanges': [
                    {
                        'name': 'bptt', # decrease to discard long-term factor that results to purchases
                        'minValue': 3,
                        'maxValue': 10
                    },
                ],
                "continuousHyperParameterRanges": [],
                'categoricalHyperParameterRanges': [
                    #{
                    #    'name': 'recency_mask', # set to false for discarding recency factor of purchases
                    #    'values': ['true','false']
                    #},
                ],
            },
            "hpoResourceConfig": {
                "maxNumberOfTrainingJobs": "40",
                "maxParallelTrainingJobs": "10"
            }
        }
    }

**Create Solution for User Personalization**

In [None]:
create_solution_response_up = personalize.create_solution(
    name = "{}-{}-up-solution2".format(base_name, iteration),
    datasetGroupArn = dataset_group_arn,
    recipeArn = recipe_arn_up,
    eventType = 'reviewed',
    performHPO = True,
    solutionConfig = solution_config
)

create_solution_response_hrnn = personalize.create_solution(
    name = "{}-{}-hrnn-solution2".format(base_name, iteration),
    datasetGroupArn = dataset_group_arn,
    recipeArn = recipe_arn_hrnn,
    eventType = 'reviewed',
    performHPO = True,
    solutionConfig = solution_config
)

solution_arn_up = create_solution_response_up['solutionArn']
solution_arn_hrnn = create_solution_response_hrnn['solutionArn']
print(json.dumps(create_solution_response_up, indent=2))
print(json.dumps(create_solution_response_hrnn, indent=2))

**Create Solution Version**

In [None]:
create_solution_version_response_up = personalize.create_solution_version(
    solutionArn = solution_arn_up
)

create_solution_version_response_hrnn = personalize.create_solution_version(
    solutionArn = solution_arn_hrnn
)

solution_version_arn_up = create_solution_version_response_up['solutionVersionArn']
solution_version_arn_hrnn = create_solution_version_response_hrnn['solutionVersionArn']
print(json.dumps(create_solution_version_response_up, indent=2))
print(json.dumps(create_solution_version_response_hrnn, indent=2))

**Wait for Solution Version to Have ACTIVE Status**

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn_up
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    print("SolutionVersion: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_solution_version_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn_hrnn
    )
    status = describe_solution_version_response["solutionVersion"]["status"]
    print("SolutionVersion: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

**Get Metrics of Solution**

In [None]:
get_solution_metrics_response_up = personalize.get_solution_metrics(
    solutionVersionArn = solution_version_arn_up
)
get_solution_metrics_response_hrnn = personalize.get_solution_metrics(
    solutionVersionArn = solution_version_arn_hrnn
)

print(json.dumps(get_solution_metrics_response_up, indent=2))
print(json.dumps(get_solution_metrics_response_hrnn, indent=2))

## Deploy

**Create Campaign**

In [None]:
create_campaign_response_up = personalize.create_campaign(
    name = "{}-{}-campaign-up".format(base_name, iteration),
    solutionVersionArn = solution_version_arn_up,
    minProvisionedTPS = 1
)
create_campaign_response_hrnn = personalize.create_campaign(
    name = "{}-{}-campaign-hrnn".format(base_name, iteration),
    solutionVersionArn = solution_version_arn_hrnn,
    minProvisionedTPS = 1
)

campaign_arn_up = create_campaign_response_up['campaignArn']
campaign_arn_hrnn = create_campaign_response_hrnn['campaignArn']
print(json.dumps(create_campaign_response_up, indent=2))
print(json.dumps(create_campaign_response_hrnn, indent=2))


**Wait for Campaign to Have ACTIVE Status**

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn_up
    )
    status = describe_campaign_response["campaign"]["status"]
    print("Campaign: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn_hrnn
    )
    status = describe_campaign_response["campaign"]["status"]
    print("Campaign: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

## Clean and upload product metadata

In [None]:
filename = 'metadata.json'
boto3.Session().resource('s3').Bucket(bucket).Object("{}/{}".format(prefix,filename)).upload_file(filename)

## Prepare for inference

**Prepare method to enrich the items information with title and image URL**

In [None]:
import json
def encrich_with_metadata(products):
    client = boto3.client('s3')
    r = client.select_object_content(
        Bucket=bucket,
        Key="{}/metadata.json".format(prefix),
        Expression="SELECT s.image, s.asin, s.title FROM S3Object s WHERE s.asin IN {}".format(products),
        #Expression="SELECT s.imUrl, s.asin, s.title FROM S3Object s WHERE s.asin IN {}".format(products),
        ExpressionType='SQL',
        RequestProgress={
            'Enabled': False
        },
        InputSerialization={
            'JSON': {
                'Type': 'LINES'
            }
        },
        OutputSerialization={
            'JSON':{
                'RecordDelimiter': '\n',
            }
        },
    )
    output = []
    for event in r['Payload']:
        if 'Records' in event:
            recs = event['Records']['Payload'].decode('utf-8').strip().split("\n")
            recs = list(map(lambda x: json.loads(x), recs))
            output += recs
    return output

**Build filter to exlude items that user has purchased and reviewed**

In [None]:
filter_response = personalize.create_filter(
    name='{}-{}-exclude-purchases'.format(base_name, iteration),
    datasetGroupArn= dataset_group_arn,
    filterExpression= 'EXCLUDE ItemID WHERE Interactions.event_type IN ("reviewed")'
)

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_filter_response = personalize.describe_filter(
        filterArn = filter_response['filterArn']
    )
    status = describe_filter_response["filter"]["status"]
    print("Filter: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(60)

**Get a sample user to test**

In [None]:
# Get a user who has considerable number of reviews
#user_id = ratings_df[ratings_df['EVENT_VALUE'] >= review_star_threshold]['USER_ID'].value_counts()[:2000].index.to_list()[1995]
user_id = ratings_df[ratings_df['EVENT_VALUE'] >= review_star_threshold]['USER_ID'].value_counts()[:2000].index.to_list()[20]

**Define method to display items for view purpose**

In [None]:
import re
def display_items(items):
    image_string = ""
    i = 1
    for item in items:
        if 'score' in item:
            caption = "{}---Score:{}---Name: {}---ASIN:{}".format(str(i),item['score'], item['title'],item['asin'])
        else:
            caption = "{}---Name: {}---ASIN:{}".format(str(i),item['title'],item['asin'])
        if len(item['image']) > 0:
            image = item['image'][0]
            image = re.sub(r'SR..,..','SR200,200',image)
            image = re.sub(r'US..','US200',image)
            image = re.sub(r'SS..','SS200',image)
            image = re.sub(r'SX..','SX200',image)
            image = re.sub(r'SY..','SX200',image)
            image = re.sub(r'CR,0,0,..,..','CR,0,0,200,200',image)
            image_string += '<figure style="float:left;"><img src="{}" alt="" width="1"/><figcaption ><center>{}</center></figcaption></figure></br>'.format(image,caption)
        else:
            image_string += '<figure style="float:left;"><img src="" alt="" width="1"/><figcaption ><center>{}</center></figcaption></figure></br>'.format(caption)
        i = i+1
    return image_string

**Get actual items that user reviewed with rating > 3 and enrich with title and image URL**

In [None]:
actual_item_list = list(ratings_df[(ratings_df["USER_ID"] == user_id) & (ratings_df['EVENT_VALUE'] >= review_star_threshold)]['ITEM_ID'])
actual_items = encrich_with_metadata(actual_item_list)

In [None]:
from IPython.display import HTML
HTML(data=display_items(actual_items))

**Get recommendation for user personalize recipe and enrich recommended items with title and images**


In [None]:
get_recommendations_response_up = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn_up,
    userId = str(user_id),
    numResults=10,
    filterArn=filter_response['filterArn']
)

recommended_item_list = list(map(lambda x: x['itemId'], get_recommendations_response_up['itemList']))
print(get_recommendations_response_up['itemList'])

    

**Prepare for display**

In [None]:
recommended_items = encrich_with_metadata(recommended_item_list)
items_dictionary = {}
for item in get_recommendations_response_up['itemList']:
    items_dictionary[item['itemId']]=item['score']                                           
for item in recommended_items:
    item['score'] = items_dictionary[item['asin']]
recommended_items.sort(key=lambda x: x['score'], reverse=True)

**These are the actual items that user reviewed**

In [None]:
from IPython.display import HTML
HTML(data=display_items(recommended_items))

**Get recommendation for user personalize recipe and enrich recommended items with title and images**


In [None]:
get_recommendations_response_hrnn = personalize_runtime.get_recommendations(
    campaignArn = campaign_arn_hrnn,
    userId = str(user_id),
    numResults=10,
    filterArn=filter_response['filterArn']
)

recommended_item_list = list(map(lambda x: x['itemId'], get_recommendations_response_hrnn['itemList']))
print(get_recommendations_response_hrnn['itemList'])

    

**Prepare for display**

In [None]:
recommended_items = encrich_with_metadata(recommended_item_list)
items_dictionary = {}
for item in get_recommendations_response_hrnn['itemList']:
    items_dictionary[item['itemId']]=item['score']                                           
for item in recommended_items:
    item['score'] = items_dictionary[item['asin']]
recommended_items.sort(key=lambda x: x['score'], reverse=True)

**These are the actual items that user reviewed**

In [None]:
from IPython.display import HTML
HTML(data=display_items(recommended_items))