# Personalizeで未学習のユーザに対するレコメンドの検証

In [None]:
import pandas as pd
import numpy as np
import json
from os import path
import boto3
from datetime import datetime
import time
import pprint
pp = pprint.PrettyPrinter(indent=2)


In [None]:
current_dt = datetime.now().strftime('%Y%m%d-%H%M%S')
bucket_name = ''
prefix = 'test-untrained-user'
s3_prefix = f'personalize-work/{prefix}'
user_ids_json_s3_path = path.join(s3_prefix, 'user_ids.jsonl')
role_arn = 'arn:aws:iam::{account_id}:role/service-role/AmazonPersonalize-ExecutionRole'
data_locations = {
    'Users': path.join('s3://', bucket_name, s3_prefix, 'users.csv'),
    'Items': path.join('s3://', bucket_name, s3_prefix, 'items.csv'),
    'Interactions': path.join('s3://', bucket_name, s3_prefix, 'interactions.csv')
}

In [None]:
personalize = boto3.Session().client('personalize')
bucket = boto3.Session().resource('s3').Bucket(bucket_name)

In [None]:
!wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip
!unzip -o ml-100k.zip
df = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP'])
df

In [None]:
users = pd.read_csv('./ml-100k/u.user', sep='|', names=[
    'USER_ID', 'AGE', 'GENDER', 'OCCUPATION', 'ZIP_CODE'
], encoding='latin-1')
users.set_index('USER_ID', inplace=True)
users = users[['AGE', 'GENDER']]
users

In [None]:
items = pd.read_csv('./ml-100k/u.item', sep='|', names=[
    'ITEM_ID', 'TITLE', 'RELEASE_DATE', 'VIDEO_RELEASE_DATE', 'IMDB_URL', 'UNKNOWN', 'ACTION', 'ADVENTURE', 'ANIMATION', "CHILDREN'S", 'COMEDY', 'CRIME', 'DOCUMENTARY', 'DRAMA', 'FANTASY', 'FILM-NOIR', 'HORROR', 'MUSICAL', 'MYSTERY', 'ROMANCE', 'SCI-FI', 'THRILLER', 'WAR', 'WESTERN'
], encoding='latin-1')
items.set_index('ITEM_ID', inplace=True)
def extract_genre(row):
    return '|'.join([i for i, v in row[5:].items() if v == 1 ])
items['GENRE'] = items.apply(extract_genre, axis=1)
items = items[['TITLE', 'GENRE']]
item_watch_count = df.groupby('ITEM_ID').size().sort_values(ascending=False)
item_watch_count.name = 'watch_ct'
items = items.join(item_watch_count)
items

In [None]:
# 映画評価時のユーザの興味のあるジャンルをインタラクションデータに追加する(評価した映画のジャンルをランダムに抽出)
df['GENRE_PREFERENCE'] = items.GENRE[df['ITEM_ID'].values].str.split('|').apply(lambda x: x[np.random.randint(len(x))]).reset_index(drop=True)

In [None]:
# 各データをアップロード
users.to_csv(data_locations['Users'])
items['GENRE'].to_csv(data_locations['Items']) # アイテムに関する情報はジャンルだけに絞る
df.to_csv(data_locations['Interactions'], index=False)

# Personalizeのセットアップ

In [None]:
create_dataset_group_response = personalize.create_dataset_group(
    name=prefix
)
dataset_group_arn = create_dataset_group_response['datasetGroupArn']

max_time = time.time() + 3 * 60 * 60  # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn=dataset_group_arn
    )
    status = describe_dataset_group_response['datasetGroup']['status']
    if status == 'ACTIVE' or status == 'CREATE FAILED':
        print(status)
        break

    time.sleep(60)
    print('.', end='')

In [None]:
# フィールドを定義する
# 参考: https://docs.aws.amazon.com/personalize/latest/dg/how-it-works-dataset-schema.html
field_definitions = {
    'Interactions': [
        {
            'name': 'USER_ID',
            'type': 'string'
        },
        {
            'name': 'ITEM_ID',
            'type': 'string'
        },
        {
            'name': 'RATING',
            'type': 'int'
        },
        {
            'name': 'TIMESTAMP',
            'type': 'long'
        },
        {
            'name': 'GENRE_PREFERENCE',
            'type': 'string',
            'categorical': True
        }
    ],
    'Users': [
        {
            'name': 'USER_ID',
            'type': 'string'
        },
        {
            'name': 'AGE',
            'type': 'int'
        },
        {
            'name': 'GENDER',
            'type': 'string',
            'categorical': True
        }
    ],
    'Items': [
        {
            'name': 'ITEM_ID',
            'type': 'string'
        },
        {
            'name': 'GENRE',
            'type': 'string',
            'categorical': True
        }
    ]
}


In [None]:
dataset_types = ['Interactions', 'Users', 'Items']
dataset_import_job_arns = []
for dataset_type in dataset_types:
    # スキーマ作成
    create_schema_response = personalize.create_schema(
        name=f'{prefix}-{dataset_type}',
        schema=json.dumps({
            'type': 'record',
            'name': dataset_type,
            'namespace': 'com.amazonaws.personalize.schema',
            'fields': field_definitions[dataset_type],
            'version': '1.0'
        })
    )
    
    # データセット作成
    create_dataset_response = personalize.create_dataset(
        name=f'{prefix}-{dataset_type}',
        datasetType=dataset_type,
        datasetGroupArn=dataset_group_arn,
        schemaArn=create_schema_response['schemaArn']
    )

    # データ読み込み
    create_dataset_import_job_response = personalize.create_dataset_import_job(
        jobName=f'{prefix}-{dataset_type}-{current_dt}',
        datasetArn=create_dataset_response['datasetArn'],
        dataSource={
            'dataLocation': data_locations[dataset_type]
        },
        roleArn=role_arn
    )
    dataset_import_job_arns.append(create_dataset_import_job_response['datasetImportJobArn'])


In [None]:
# データの読み込みが完了するまで待機する
def wait_for_importing_data(job_arn, max_wait_interval=3 * 60 * 60):
    print(job_arn)
    max_time = time.time() + max_wait_interval
    while time.time() < max_time:
        describe_dataset_import_job_response = personalize.describe_dataset_import_job(
            datasetImportJobArn=job_arn
        )
        status = describe_dataset_import_job_response['datasetImportJob']['status']

        if status == 'ACTIVE' or status == 'CREATE FAILED':
            print(status)
            break

        time.sleep(60)
        print('.', end='')

for dataset_import_job_arn in dataset_import_job_arns:
    wait_for_importing_data(dataset_import_job_arn)

In [None]:
# ソリューションの作成
# personalize.list_recipes()
recipe_arns = [
    'arn:aws:personalize:::recipe/aws-hrnn',
    'arn:aws:personalize:::recipe/aws-hrnn-metadata',
    'arn:aws:personalize:::recipe/aws-popularity-count'
]

solution_version_arns = []
for recipe_arn in recipe_arns:
    # ソリューションの作成
    create_solution_response = personalize.create_solution(
        name=f'{prefix}-{path.basename(recipe_arn)}',
        datasetGroupArn=dataset_group_arn,
        recipeArn=recipe_arn
    )
    solution_arn = create_solution_response['solutionArn']

    # ソリューションバージョンの作成(モデルの学習)
    create_solution_version_response = personalize.create_solution_version(
        solutionArn=solution_arn
    )
    solution_version_arns.append(
        create_solution_version_response['solutionVersionArn'])


def wait_for_creating_solution_version(solution_version_arn, max_wait_interval=3 * 60 * 60):
    print(solution_version_arn)
    max_time = time.time() + max_wait_interval
    while time.time() < max_time:
        describe_solution_version_response = personalize.describe_solution_version(
            solutionVersionArn=solution_version_arn
        )
        status = describe_solution_version_response['solutionVersion']['status']
        if status == 'ACTIVE' or status == 'CREATE FAILED':
            print(status)
            break

        time.sleep(60)
        print('.', end='')


for solution_version_arn in solution_version_arns:
    wait_for_creating_solution_version(solution_version_arn)


In [None]:
metrics = {}
for solution_version_arn in solution_version_arns:
    response = personalize.get_solution_metrics(
        solutionVersionArn=solution_version_arn
    )
    metrics[solution_version_arn.split('/')[-2]] = response['metrics']
pd.DataFrame.from_dict(metrics, orient='index')

# レコメンド

In [None]:
target_user_ids = list(users.index.values)

# 未学習ユーザを追加
target_user_ids.append(users.index.values.max() + 1)
target_user_ids.append(users.index.values.max() + 2)
user_ids = [json.dumps({'userId': str(user_id)}) for user_id in target_user_ids]
bucket.Object(user_ids_json_s3_path).put(Body='\n'.join(user_ids))

In [None]:
user_ids_json_s3_uri = f's3://{bucket_name}/{user_ids_json_s3_path}'

batch_job_arns = []
for solution_version_arn in solution_version_arns:
    solution_name = solution_version_arn.split('/')[-2]
    solution_version = path.basename(solution_version_arn)
    response = personalize.create_batch_inference_job(
        jobName=f'{solution_version}',
        solutionVersionArn=solution_version_arn,
        numResults=100,
        jobInput={
            's3DataSource': {
                'path': user_ids_json_s3_uri
            }
        },
        jobOutput={
            's3DataDestination': {
                'path': path.join(path.dirname(user_ids_json_s3_uri), solution_name, solution_version, '')
            }
        },
        roleArn=role_arn
    )
    batch_job_arns.append(response['batchInferenceJobArn'])

In [None]:
def wait_for_batch_inference_job(job_arn, max_wait_interval=3 * 60 * 60):
    print(job_arn)
    max_time = time.time() + max_wait_interval
    while time.time() < max_time:
        response = personalize.describe_batch_inference_job(
            batchInferenceJobArn=job_arn
        )
        status = response['batchInferenceJob']['status']
        if status == 'ACTIVE' or status == 'CREATE FAILED':
            print(status)
            break

        time.sleep(60)
        print('.', end='')

def transform_recommendation(dic):
    return (
        int(dic['input']['userId']), list(map(lambda x: int(x), dic['output']['recommendedItems']))
    )
        
user_base_recommendations = {}
recommends = {}
for batch_job_arn in batch_job_arns:        
    wait_for_batch_inference_job(batch_job_arn)
    response = personalize.describe_batch_inference_job(batchInferenceJobArn=batch_job_arn)
    job = response['batchInferenceJob']
    file_s3_path = path.join(
        *job['jobOutput']['s3DataDestination']['path'].split('/')[3:],
        path.basename(job['jobInput']['s3DataSource']['path']) + '.out'
    )

    body = bucket.Object(file_s3_path).get()['Body'].read()
    solution_name = job['solutionVersionArn'].split('/')[-2]
    recommends[solution_name] = [transform_recommendation(json.loads(ss)) for ss in body.splitlines()]
    user_base_recommendations[solution_name] = dict([transform_recommendation(json.loads(ss)) for ss in body.splitlines()])

In [None]:
def fetch_recommendation(user_id):
    recoms = {}
    for k, v in sorted(user_base_recommendations.items()):
        recoms[k] = items[items.index.isin(v[user_id])].reset_index()
    return pd.concat(recoms, axis=1)
    
    
def fetch_interaction(user_id):
    return df[df.USER_ID == user_id].join(items, on='ITEM_ID').sort_values('TIMESTAMP', ascending=False).set_index('ITEM_ID').loc[:, ['TITLE', 'GENRE', 'RATING', 'TIMESTAMP']]


In [None]:
# 学習済みユーザのインタラクションデータ
fetch_interaction(1)[:20]

In [None]:
# 学習済みユーザのレコメンド内容
fetch_recommendation(1)[:20]

In [None]:
# 未学習ユーザのレコメンド内容
fetch_recommendation(944)[:20]

In [None]:
# 未学習ユーザのレコメンド内容
fetch_recommendation(945)[:20]

# 再学習

In [None]:
solution_version_arns2 = []
response = personalize.list_solutions(datasetGroupArn=dataset_group_arn)
for solution in response['solutions']:
    solution_arn = solution['solutionArn']

    # ソリューションバージョンの作成(モデルの学習)
    create_solution_version_response = personalize.create_solution_version(
        solutionArn=solution_arn
    )
    solution_version_arns2.append(
        create_solution_version_response['solutionVersionArn'])


def wait_for_creating_solution_version(solution_version_arn, max_wait_interval=3 * 60 * 60):
    print(solution_version_arn)
    max_time = time.time() + max_wait_interval
    while time.time() < max_time:
        describe_solution_version_response = personalize.describe_solution_version(
            solutionVersionArn=solution_version_arn
        )
        status = describe_solution_version_response['solutionVersion']['status']
        if status == 'ACTIVE' or status == 'CREATE FAILED':
            print(status)
            break

        time.sleep(60)
        print('.', end='')


for solution_version_arn in solution_version_arns2:
    wait_for_creating_solution_version(solution_version_arn)

In [None]:
metrics = {}
for solution_version_arn in solution_version_arns2:
    response = personalize.get_solution_metrics(
        solutionVersionArn=solution_version_arn
    )
    metrics[solution_version_arn.split('/')[-2]] = response['metrics']
pd.DataFrame.from_dict(metrics, orient='index')

In [None]:
batch_job_arns2 = []
for solution_version_arn in solution_version_arns2:
    solution_name = solution_version_arn.split('/')[-2]
    solution_version = path.basename(solution_version_arn)
    response = personalize.create_batch_inference_job(
        jobName=f'{solution_version}',
        solutionVersionArn=solution_version_arn,
        numResults=100,
        jobInput={
            's3DataSource': {
                'path': user_ids_json_s3_uri
            }
        },
        jobOutput={
            's3DataDestination': {
                'path': path.join(path.dirname(user_ids_json_s3_uri), solution_name, solution_version, '')
            }
        },
        roleArn=role_arn
    )
    batch_job_arns2.append(response['batchInferenceJobArn'])

In [None]:
user_base_recommendations2 = {}
recommends2 = {}
for batch_job_arn in batch_job_arns2:        
    wait_for_batch_inference_job(batch_job_arn)
    response = personalize.describe_batch_inference_job(batchInferenceJobArn=batch_job_arn)
    job = response['batchInferenceJob']
    file_s3_path = path.join(
        *job['jobOutput']['s3DataDestination']['path'].split('/')[3:],
        path.basename(job['jobInput']['s3DataSource']['path']) + '.out'
    )

    body = bucket.Object(file_s3_path).get()['Body'].read()
    solution_name = job['solutionVersionArn'].split('/')[-2]
    recommends2[solution_name] = [transform_recommendation(json.loads(ss)) for ss in body.splitlines()]
    user_base_recommendations2[solution_name] = dict([transform_recommendation(json.loads(ss)) for ss in body.splitlines()])

In [None]:
def fetch_recommendation2(user_id):
    recoms = {}
    for k, v in sorted(user_base_recommendations2.items()):
        recoms[k] = items[items.index.isin(v[user_id])].reset_index()
    return pd.concat(recoms, axis=1)


In [None]:
fetch_recommendation2(1)[:20]

In [None]:
fetch_recommendation2(944)[:20]