# Amazon SageMaker の Factorization Machines を使用したレコメンダ システムの構築

---

---

## 背景

- Factorization machines = 因数分解機 （以下、Factorization machines）
- レコメンダ システムは、アマゾン、並びにネットフリックス Prizeの例からもみれるように、機械学習のカタリストとなりました。
- ユーザー・アイテムの matrix factorization は、コア・中核的な手法です
- Factorization machines は、linear prediction（線形予測） とペアワイズ フィーチャの相互作用の因数分解された表現を組み合わせます。

$$\hat{r} = w_0 + \sum_{i} {w_i x_i} + \sum_{i} {\sum_{j > i} {\langle v_i, v_j \rangle x_i x_j}}$$

- Amazon SageMaker の built-in Factorization Machines は高度にスケーラブルです。

---

## セットアップ

以下のセルを実行する前に：
1. マネージメントコンソールにて、SageMakerがホストするノートブックインスタンスを立ち上げて下さい。
2. SageMaker の IAM ポリシーをそのノートブックインスタンスに追加して、S3 のread/writeアクセスを許可するように設定します。

セル１と２は：
3. S3バケット、セッション等の作成。(first code cell)
4. 必要なライブラリのインポート (second code cell)

In [1]:
import sagemaker

sess = sagemaker.Session()
bucket = sess.default_bucket()
base = 'DEMO-loft-recommender'
prefix = 'sagemaker/' + base

role = sagemaker.get_execution_role()

In [2]:
import sagemaker
import os
import pandas as pd
import numpy as np
import boto3
import json
import io
import matplotlib.pyplot as plt
import sagemaker.amazon.common as smac
from sagemaker.predictor import json_deserializer
from scipy.sparse import csr_matrix

---

## データ

[Amazon Reviews AWS Public Dataset](https://s3.amazonaws.com/amazon-reviews-pds/readme.html)
- 1 to 5 star ratings
- 2百万以上の Amazon customers
- 16十万以上の digital videos 

In [3]:
!mkdir /tmp/recsys/
!aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz /tmp/recsys/

mkdir: cannot create directory ‘/tmp/recsys/’: File exists
download: s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz to ../../../../tmp/recsys/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz


In [4]:
df = pd.read_csv('/tmp/recsys/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz', delimiter='\t',error_bad_lines=False)
df.head()

b'Skipping line 92523: expected 15 fields, saw 22\n'
b'Skipping line 343254: expected 15 fields, saw 22\n'
b'Skipping line 524626: expected 15 fields, saw 22\n'
b'Skipping line 623024: expected 15 fields, saw 22\n'
b'Skipping line 977412: expected 15 fields, saw 22\n'
b'Skipping line 1496867: expected 15 fields, saw 22\n'
b'Skipping line 1711638: expected 15 fields, saw 22\n'
b'Skipping line 1787213: expected 15 fields, saw 22\n'
b'Skipping line 2395306: expected 15 fields, saw 22\n'
b'Skipping line 2527690: expected 15 fields, saw 22\n'


Unnamed: 0,marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,US,12190288,R3FU16928EP5TC,B00AYB1482,668895143,Enlightened: Season 1,Digital_Video_Download,5,0,0,N,Y,I loved it and I wish there was a season 3,I loved it and I wish there was a season 3... ...,2015-08-31
1,US,30549954,R1IZHHS1MH3AQ4,B00KQD28OM,246219280,Vicious,Digital_Video_Download,5,0,0,N,Y,As always it seems that the best shows come fr...,As always it seems that the best shows come fr...,2015-08-31
2,US,52895410,R52R85WC6TIAH,B01489L5LQ,534732318,After Words,Digital_Video_Download,4,17,18,N,Y,Charming movie,"This movie isn't perfect, but it gets a lot of...",2015-08-31
3,US,27072354,R7HOOYTVIB0DS,B008LOVIIK,239012694,Masterpiece: Inspector Lewis Season 5,Digital_Video_Download,5,0,0,N,Y,Five Stars,excellant this is what tv should be,2015-08-31
4,US,26939022,R1XQ2N5CDOZGNX,B0094LZMT0,535858974,On The Waterfront,Digital_Video_Download,5,0,0,N,Y,Brilliant film from beginning to end,Brilliant film from beginning to end. All of t...,2015-08-31


データセットの列:

- `marketplace`: 2 文字の国コード (このデータセットは、全てUS,「米国」となっています）。
- `customer_id`: ランダムに割り当てたお客様番号又はID。 
- `review_id`: レビューをユニークに識別できるID。
- `product_id`: Amazon 標準識別番号 (ASIN)。  
- `product_parent`: そのASINの親。 Multiple ASINs (color or format variations of the same product) can roll up into a single parent parent.
- `product_title`: 商品のタイトル説明。
- `product_category`: グループレビューに使用できる幅広い製品カテゴリ（このデータセットの場合はデジタルビデオ）
- `star_rating`: レビューの評価 (1 から 5 つ星)。
- `helpful_votes`: レビューに役立つ投票数。
- `total_votes`: レビューが受け取った投票総数。
- `vine`: レビューは [Vine](https://www.amazon.com/gp/vine/help) のプログラムの一部として書かれていますか？
- `verified_purchase`: 確認済みの購入からのレビューですか？
- `review_headline`: レビュー自体のタイトル。
- `review_body`: レビューのテキスト。
- `review_date`: レビューが書き込まれた日付。

以下のセルで、使用しないフィールドを削除します。

In [5]:
df = df[['customer_id', 'product_id', 'product_title', 'star_rating', 'review_date']]

ほとんどのユーザーは、ほとんどの映画を評価しません。 - Check our long tail

In [6]:
df#customers

Unnamed: 0,customer_id,product_id,product_title,star_rating,review_date
0,12190288,B00AYB1482,Enlightened: Season 1,5,2015-08-31
1,30549954,B00KQD28OM,Vicious,5,2015-08-31
2,52895410,B01489L5LQ,After Words,4,2015-08-31
3,27072354,B008LOVIIK,Masterpiece: Inspector Lewis Season 5,5,2015-08-31
4,26939022,B0094LZMT0,On The Waterfront,5,2015-08-31
5,4772040,B0112OSOQE,Rick and Morty Season 2,5,2015-08-31
6,12910040,B000NPE5SA,Africa Screams,4,2015-08-31
7,38805573,B00XWV4QXG,Entourage: Season 7,3,2015-08-31
8,37100714,B00X8UKOUK,Catastrophe - Season 1,2,2015-08-31
9,41234409,B00OOKXTFU,The Worricker Trilogy Season 1,3,2015-08-31


In [None]:
products

In [None]:
customers = df['customer_id'].value_counts()
products = df['product_id'].value_counts()

quantiles = [0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.96, 0.97, 0.98, 0.99, 1]
print('customers\n', customers.quantile(quantiles))
print('products\n', products.quantile(quantiles))

多数の映画を評価していないお客様を除外します。Filter out customers who haven't rated many movies

In [None]:
customers = customers[customers >= 5]
products = products[products >= 10]

reduced_df = df.merge(pd.DataFrame({'customer_id': customers.index})).merge(pd.DataFrame({'product_id': products.index}))

お客様と映画の連番インデックス（sequential index）を作成します。

In [None]:
customers = reduced_df['customer_id'].value_counts()
products = reduced_df['product_id'].value_counts()

In [None]:
customers

In [None]:
products

In [None]:
customer_index = pd.DataFrame({'customer_id': customers.index, 'user': np.arange(customers.shape[0])})
product_index = pd.DataFrame({'product_id': products.index, 
                              'item': np.arange(products.shape[0]) + customer_index.shape[0]})

reduced_df = reduced_df.merge(customer_index).merge(product_index)
reduced_df.head()

最初のレビューからの日数をカウントする（トレンドをキャプチャするfeatureとして使えます）

In [None]:
customer_index

In [None]:
reduced_df['review_date'] = pd.to_datetime(reduced_df['review_date'])
customer_first_date = reduced_df.groupby('customer_id')['review_date'].min().reset_index()
customer_first_date.columns = ['customer_id', 'first_review_date']

In [None]:
reduced_df = reduced_df.merge(customer_first_date)
reduced_df['days_since_first'] = (reduced_df['review_date'] - reduced_df['first_review_date']).dt.days
reduced_df['days_since_first'] = reduced_df['days_since_first'].fillna(0)

In [None]:
reduced_df

学習用のデータととテスト用のデータに分けます。

In [None]:
test_df = reduced_df.groupby('customer_id').last().reset_index()

train_df = reduced_df.merge(test_df[['customer_id', 'product_id']], 
                            on=['customer_id', 'product_id'], 
                            how='outer', 
                            indicator=True)
train_df = train_df[(train_df['_merge'] == 'left_only')]

- Factorization machinesは、以下のようなデータをインプットとして使います。
  - Sparse matrix（疎行列又は、スパース行列）。成分のほとんどが零である行列。
  - ターゲット変数は、映画に対するそのお客様・ユーザーの評価です。
  - ユーザーのワンホットエンコーディング ($N$ features)
  - 映画のワンホットエンコーディング ($M$ features)

|Rating|User1|User2|...|UserN|Movie1|Movie2|Movie3|...|MovieM|Feature1|Feature2|...|
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|4|1|0|...|0|1|0|0|...|0|20|2.2|...|
|5|1|0|...|0|0|1|0|...|0|17|9.1|...|
|3|0|1|...|0|1|0|0|...|0|3|11.0|...|
|4|0|1|...|0|0|0|1|...|0|15|6.4|...|


- Wouldn't want to hold this full matrix in memory
  - Create a sparse matrix
  - Designed to work efficiently with CPUs. Some parts of training for more dense matrices can be parallelized with GPUs

In [None]:
def to_csr_matrix(df, num_users, num_items):
    feature_dim = num_users + num_items + 1
    data = np.concatenate([np.array([1] * df.shape[0]),
                           np.array([1] * df.shape[0]),
                           df['days_since_first'].values])
    row = np.concatenate([np.arange(df.shape[0])] * 3)
    col = np.concatenate([df['user'].values,
                          df['item'].values,
                          np.array([feature_dim - 1] * df.shape[0])])
    return csr_matrix((data, (row, col)), 
                      shape=(df.shape[0], feature_dim), 
                      dtype=np.float32)

In [None]:
train_csr = to_csr_matrix(train_df, customer_index.shape[0], product_index.shape[0])
test_csr = to_csr_matrix(test_df, customer_index.shape[0], product_index.shape[0])

In [None]:
train_df

以下、SageMakerのfactorization machinesが必要とするスパース・レコードIOに包まれたprotobufに変換します。

In [None]:
def to_s3_protobuf(csr, label, bucket, prefix, channel='train', splits=10):
    indices = np.array_split(np.arange(csr.shape[0]), splits)
    for i in range(len(indices)):
        index = indices[i]
        buf = io.BytesIO()
        smac.write_spmatrix_to_sparse_tensor(buf, csr[index, ], label[index])
        buf.seek(0)
        boto3.client('s3').upload_fileobj(buf, bucket, '{}/{}/data-{}'.format(prefix, channel, i))

In [None]:
to_s3_protobuf(train_csr, train_df['star_rating'].values.astype(np.float32), bucket, prefix)
to_s3_protobuf(test_csr, test_df['star_rating'].values.astype(np.float32), bucket, prefix, channel='test', splits=1)

---

## 学習

- トレーニング・ジョブを実行するための [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) estimatorを作成し、以下の項目を指定します。
  - アルゴリズムが保存されているコンテナのイメージ
  - IAM ロール
  - ハードウェアのセットアップ
  - アウトプットを保存する S3 のバケット
  - アルゴリズムのハイパーパラメータ
    - `feature_dim`: $N + M + 1$ (追加の feature は、トレントをキャプチャする `days_since_first`)
    - `num_factors`: 因数分解された交互作用で減少させた dimension
    - `epochs`: データセットを通過させる回数
- `.fit()` は S3 の学習用とテスト用データを指定し、トレーニングジョブを開始します。

In [None]:
fm = sagemaker.estimator.Estimator(
    sagemaker.amazon.amazon_estimator.get_image_uri(boto3.Session().region_name, 'factorization-machines', 'latest'),
    role, 
    train_instance_count=4, 
    train_instance_type='ml.c5.2xlarge',
    output_path='s3://{}/{}/output'.format(bucket, prefix),
    base_job_name=base,
    sagemaker_session=sess)

fm.set_hyperparameters(
    feature_dim=customer_index.shape[0] + product_index.shape[0] + 1,
    predictor_type='regressor',
    mini_batch_size=1000,
    num_factors=256,
    epochs=3)

fm.fit({'train': sagemaker.s3_input('s3://{}/{}/train/'.format(bucket, prefix), distribution='ShardedByS3Key'), 
        'test': sagemaker.s3_input('s3://{}/{}/test/'.format(bucket, prefix), distribution='FullyReplicated')})

---

## ホスト・デプロイ

学習を終えたモデルを、リアルタイムの本番環境エンドポイントにデプロイします。

In [None]:
fm_predictor = fm.deploy(instance_type='ml.m4.xlarge', initial_instance_count=1)

呼び出しリクエストの際に使う、メモリ内のデータをシリアル化する為に predictor を設定します。　

In [None]:
def fm_serializer(df):
    feature_dim = customer_index.shape[0] + product_index.shape[0] + 1
    js = {'instances': []}
    for index, data in df.iterrows():
        js['instances'].append({'data': {'features': {'values': [1, 1, data['days_since_first']],
                                                      'keys': [data['user'], data['item'], feature_dim - 1],
                                                      'shape': [feature_dim]}}})
    return json.dumps(js)

In [None]:
fm_predictor.content_type = 'application/json'
fm_predictor.serializer = fm_serializer
fm_predictor.deserializer = json_deserializer

単一のユーザー・アイテムのリアルタイム予測は、以下の用に行います。

In [None]:
test_df.head(1)

In [None]:
fm_predictor.predict(test_df.head(1))

エンドポイントをクリーンアップします。

In [None]:
fm_predictor.delete_endpoint()

---
##  ノートブックインスタンスを停止して下さい! 
---

# 最後に

- 大規模なデータセットに対するレコメンダ システムを迅速かつ正確に構築
- 拡張する為にフィーチャを追加する　Add more features to extend
- 他の方法との比べてみる
- 2つのモデルをアンサンブルとして使ってみる