# Implementing NFT Price Class Prediction System with SageMaker Built-In Algorithm
_**Making Price Class Suggestion for NFT Using Factorization Machines**_

--- 

*This work is based on content from [Implementing Recommender System notebook](https://github.com/aws-samples/sagemaker-ml-workflow-with-apache-airflow/blob/master/notebooks/amazon-video-recommender_using_fm_algo.ipynb)*

---

## Table of Contents

1. [Background](#Background)
1. [Setup](#Setup)
1. [Data](#Data)
  1. [Explore](#Explore)
  1. [Clean](#Clean)
  1. [Prepare](#Prepare)
1. [Model Training](#Model-Training)
1. [Model Inference](#Model-Inference)
  1. [Real-Time Inference](#Real-Time-Inference)
  1. [Batch Inference](#Batch-Inference)
1. [Evaluate Model Performance](#Evaluate-Model-Performance)
1. [Model Tuning](#Model-Tuning)
1. [Wrap-up](#Wrap-up)
  1. [Clean-Up](#Clean-up-(optional))

---

## Background

In many ways, recommender systems were a catalyst for the current popularity of machine learning.  One of Amazon's earliest successes was the "Customers who bought this, also bought..." feature, while the million dollar Netflix Prize spurred research, raised public awareness, and inspired numerous other data science competitions.

Recommender systems can utilize a multitude of data sources and ML algorithms, and most combine various unsupervised, supervised, and reinforcement learning techniques into a holistic framework.  However, the core component is almost always a model which predicts a user's rating (or purchase) for a certain item based on that user's historical ratings of similar items as well as the behavior of other similar users.  The minimal required dataset for this is a history of user item ratings.  In our case, we'll use 1 to 5 star ratings from over 2M Amazon customers on over 160K digital videos. More details on this dataset can be found at its [AWS Public Datasets page](https://s3.amazonaws.com/amazon-reviews-pds/readme.html).

Matrix factorization has been the cornerstone of most user-item prediction models.  This method starts with the large, sparse, user-item ratings in a single matrix, where users index the rows, and items index the columns.  It then seeks to find two lower-dimensional, dense matrices which, when multiplied together, preserve the information and relationships in the larger matrix.

![image](./factorization.png)

Matrix factorization has been extended and generalized with deep learning and embeddings.  These techniques allows us to introduce non-linearities for enhanced performance and flexibility.  This notebook will fit a neural network-based model to generate recommendations for the Amazon video dataset.  It will start by exploring our data in the notebook, training a model on the data and fit our model using a SageMaker managed training cluster.  We'll then deploy to an endpoint and check our method.

We will also see how the tasks in the machine learning pipeline can be orchestrated and automated through Apache Airflow integration with Sagemaker.

---

## Setup

_This notebook was created and tested on an ml.t2.xlarge notebook instance._

Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these.  Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the `get_execution_role()` call with the appropriate full IAM role arn string(s).

In [5]:
bucket = 's3a://airflow-sagemaker-cd6fd720'
#prefix = 'sagemaker/fm-recsys'

import sagemaker

from sagemaker.tuner import HyperparameterTuner, ContinuousParameter
from sagemaker.analytics import HyperparameterTuningJobAnalytics, TrainingJobAnalytics

role = sagemaker.get_execution_role()
sess = sagemaker.Session()
smclient = boto3.Session().client('sagemaker')

ValueError: Must setup local AWS configuration with a region supported by SageMaker.

Now let's load the Python libraries we'll need for the remainder of this example notebook.

In [22]:
#Essentials
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import pandas as pd
from pandas import Series, DataFrame
from pandas.api.types import CategoricalDtype
pd.options.display.max_columns = None
import numpy as np; np.random.seed(2022)
import random

#Image creation
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.patches as mpatches
from matplotlib import pyplot
import plotly.express as px
import plotly.graph_objects as go

#Image display
from IPython.display import Image as image
from IPython.display import display

#Preprocessing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.compose import make_column_transformer
from sklearn.pipeline import Pipeline
from sklearn.pipeline import make_pipeline

#Models
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
import xgboost
from xgboost import XGBClassifier
from sklearn.svm import SVC
from sklearn import svm
from sklearn.naive_bayes import GaussianNB
from sklearn.base import clone
from sklearn.ensemble import StackingClassifier
from sklearn.ensemble import BaggingClassifier

#Metrics of accuracy
from sklearn import metrics
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score, precision_score, recall_score
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score
from numpy import mean
from numpy import std

#Other
from nbformat import current
import itertools as it
import io
import os
import sys
import time
from scipy.sparse import lil_matrix
import boto3
import json
import sagemaker.amazon.common as smac
from sagemaker.predictor import json_deserializer


- use nbformat for read/write/validate public API
- use nbformat.vX directly to composing notebooks of a particular version



In [7]:
# plot aesthetics
sns.set(color_codes=True)
sns.set_context('paper')
five_thirty_eight = ["#30a2da", "#fc4f30", "#e5ae38", "#6d904f", "#8b8b8b",]
sns.set_palette(five_thirty_eight)

%matplotlib inline

---
## Data

### Explore

Let's start by bringing in our dataset from an S3 public bucket.  As mentioned above, this contains 1 to 5 star ratings from over 2M Amazon customers on over 160K digital videos.  More details on this dataset can be found at its [AWS Public Datasets page](https://s3.amazonaws.com/amazon-reviews-pds/readme.html).

_Note, because this dataset is over a half gigabyte, the load from S3 may take ~10 minutes.  Also, since Amazon SageMaker Notebooks start with a 5GB persistent volume by default, and we don't need to keep this data on our instance for long, we'll bring it to the temporary volume (which has up to 20GB of storage)._

In [None]:
#!mkdir /tmp/recsys/
#!aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz /tmp/recsys/

Let's read the data into a Pandas DataFrame so that we can begin to understand it.

In [18]:
import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

s3_path = "s3a://data-eng-ind/parquet-files/all_collections_API.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

In [15]:
df = df.dropna(how='all', axis=1)
df = df.dropna(how='all')
df.head()

Unnamed: 0,collection_status,only_proxied_transfers,is_subject_to_whitelist,opensea_buyer_fee_basis_points,opensea_seller_fee_basis_points,featured,hidden,require_email,image_url,large_image_url,...,owner_number,day_avg_price,month_avg_price,week_avg_price,total_volume,total_sales,total_supply,average_price,max_price,min_price
0,not_requested,False,False,0,250,False,False,False,,,...,7356207.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0
1,not_requested,False,False,0,250,False,False,False,,,...,3050053.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0
2,not_requested,False,False,0,250,False,False,False,,,...,7471608.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,0
3,not_requested,False,False,0,250,False,False,False,,,...,5909669.0,0.0,0.0,0.0,0.0,0.0,50.0,0.0,0.0,0
4,not_requested,False,False,0,250,False,False,False,,,...,7396100.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0


In [24]:
# Descriptive statistics of the transposed table
df.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
owner_number,4872.0,6077133.0,1773308.0,25.0,5324539.0,6939835.0,7422735.25,7492468.0
day_avg_price,7500.0,0.004297307,0.1723171,0.0,0.0,0.0,0.0,12.5
month_avg_price,7500.0,0.007524136,0.2387651,0.0,0.0,0.0,0.0,12.5
week_avg_price,7500.0,0.007524136,0.2387651,0.0,0.0,0.0,0.0,12.5
total_volume,7500.0,0.01112951,0.3685486,0.0,0.0,0.0,0.0,25.0
total_sales,7500.0,0.05306667,2.373591,0.0,0.0,0.0,0.0,204.0
total_supply,7500.0,34.39133,353.2359,0.0,1.0,2.0,5.0,10169.0
average_price,7500.0,0.007524136,0.2387651,0.0,0.0,0.0,0.0,12.5
max_price,7500.0,0.06522798,3.397752,0.0,0.0,0.0,0.0,275.0
min_price,7500.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [25]:
# Show number of all existing data types in the dataframe
print(df.dtypes.astype(str).value_counts())

object     15
float64     9
bool        6
int64       1
dtype: int64


In [None]:
# Read data from a PostgreSQL table and load into a pandas DataFrame
import psycopg2 as pg
from sqlalchemy import create_engine

# Create an engine instance
alchemyEngine = create_engine('postgresql+psycopg2://test:@127.0.0.1', pool_recycle=3600)

# Connect to PostgreSQL server
dbConnection = alchemyEngine.connect()

# Read data from PostgreSQL database table and load into a DataFrame instance
df1 = pd.read_sql("select * from nfts.assets", dbConnection)
pd.set_option('display.expand_frame_repr', False)

# Close the database connection
dbConnection.close()

In [1]:
engine = pg.connect("dbname='opensea' user='marfapopova21' host='opensea.ckmusmy93z05.eu-west-2.rds.amazonaws.com' port='5432' password='qwerty123'")
df1 = pd.read_sql('select * from nfts.assets', con=engine)

OperationalError: connection to server at "opensea.ckmusmy93z05.eu-west-2.rds.amazonaws.com" (35.176.71.104), port 5432 failed: Operation timed out
	Is the server running on that host and accepting TCP/IP connections?


In [None]:
df.head()

We can see this dataset includes information like:

- `marketplace`: 2-letter country code (in this case all "US").
- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.
- `review_id`: A unique ID for the review.
- `product_id`: The Amazon Standard Identification Number (ASIN).  `http://www.amazon.com/dp/<ASIN>` links to the product's detail page.
- `product_parent`: The parent of that ASIN.  Multiple ASINs (color or format variations of the same product) can roll up into a single parent parent.
- `product_title`: Title description of the product.
- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).
- `star_rating`: The review's rating (1 to 5 stars).
- `helpful_votes`: Number of helpful votes for the review.
- `total_votes`: Number of total votes the review received.
- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?
- `verified_purchase`: Was the review from a verified purchase?
- `review_headline`: The title of the review itself.
- `review_body`: The text of the review.
- `review_date`: The date the review was written.

For this example, let's limit ourselves to `customer_id`, `product_id`, and `star_rating`.  Including additional features in our recommendation system could be beneficial, but would require substantial processing (particularly the text data) which would take us beyond the scope of this notebook.

*Note: we'll keep `product_title` on the dataset to help verify our recommendations later in the notebook, but it will not be used in algorithm training.*

In [None]:
df = df[['customer_id', 'product_id', 'star_rating', 'product_title']]

Because most people haven't seen most videos, and people rate fewer videos than we actually watch, we'd expect our data to be sparse.  Our algorithm should work well with this sparse problem in general, but we may still want to clean out some of the long tail.  Let's look at some basic percentiles to confirm.

In [None]:
customers = reviews['customer_id'].value_counts()
products = reviews['product_id'].value_counts()

quantiles = [0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.96, 0.97, 0.98, 0.99, 1]
product_q = pd.DataFrame(zip(quantiles, products.quantile(quantiles)), columns=["quantile", "products"])
customer_q = pd.DataFrame(zip(quantiles, customers.quantile(quantiles)), columns=["quantile", "customers"])
# product_q.tail(10)
# customer_q.tail(10)

In [None]:
axp = sns.barplot(x="quantile", y="products", data=product_q, palette=five_thirty_eight)

In [None]:
axc = sns.barplot(x="quantile", y="customers", data=customer_q, palette=five_thirty_eight)

As we can see, only about 5% of customers have rated 5 or more videos, and only 25% of videos have been rated by 9+ customers.

### Clean

Let's filter out this long tail and remove any duplicate reviews (same product and customer).

In [None]:
customers = customers[customers >= 5]
products = products[products >= 10]

print("# of records before removing the long tail = {:10d}".format(reviews.shape[0]))
reduced_df = reviews.merge(pd.DataFrame({'customer_id': customers.index})).merge(pd.DataFrame({'product_id': products.index}))
print("# of records after  removing the long tail = {:10d}".format(reduced_df.shape[0]))
reduced_df = reduced_df.drop_duplicates(['customer_id', 'product_id'])
print("# of records after  removing duplicates    = {:10d}".format(reduced_df.shape[0]))

Now, we'll recreate our customer and product lists since there are customers with more than 5 reviews, but all of their reviews are on products with less than 5 reviews (and vice versa).

In [None]:
customers = reduced_df['customer_id'].value_counts()
products = reduced_df['product_id'].value_counts()

Next, we'll number each user and item, giving them their own sequential index.  This will allow us to hold the information in a sparse format where the sequential indices indicate the row and column in our ratings matrix.

In [None]:
customer_index = pd.DataFrame({'customer_id': customers.index, 'customer': np.arange(customers.shape[0])})
product_index = pd.DataFrame({'product_id': products.index, 
                              'product': np.arange(products.shape[0])})

reduced_df = reduced_df.merge(customer_index).merge(product_index)
reduced_df.head()

Let's look at the feature dimension size whch will required for preparing the training and test data sets

In [None]:
nb_customer = reduced_df['customer'].max() + 1
nb_products = reduced_df['product'].max() + 1
feature_dim = nb_customer + nb_products
print(nb_customer, nb_products, feature_dim)

Trim down the data set to include only customer, product, star_rating which is all we need for the training algorithm to build the model

In [None]:
product_df = reduced_df[['customer', 'product', 'star_rating']]
product_df.head()

### Prepare

We will be using SageMaker's implementation of Factorization Machines (FM) for building a recommender system. The algorithm expects float32 tensors in protobuf whereas the data sets are pandas dataframe on disk. Most of the conversion effort is handled by the Amazon SageMaker Python SDK.

The FM algorithm will utilize sparse input and since the data sets are dense matrix, it has to be converted a sparse matrix with one-hot encoded feature vectors with customers and products. Thus, each sample in the data set will be a wide boolean vector with 178729 feature space (140344 customer + 38385 products) with only two values set to 1 with respect to the customer and product.

Following are the next steps

1. Split the cleaned data set into train and test data sets.
2. For each set, build a sparse matrix with one-hot encoded feature vectors (customer + products) and a label vector with star ratings.
3. Convert both the sets to protobuf encoded files.
4. Copy these files to an Amazon S3 bucket.
5. Configure and run a Factorization Machines training job on Amazon SageMaker.
6. Deploy the corresponding model to an endpoint.
7. Run predictions on test data set and validate

#### Split into Training and Test Data Sets

Let's start by [splitting](https://docs.scipy.org/doc/numpy/reference/generated/numpy.split.html) in training, validation and test sets.  This will allow us to estimate the model's accuracy on videos our customers rated, but wasn't included in our training. We will use validation data set specifically for tuning model hyper-parameters.

In [None]:
train_df, validate_df, test_df = np.split(
    product_df.sample(frac=1), 
    [int(.6*len(product_df)), int(.8*len(product_df))])

In [None]:
print("# of rows in the training data set   = {:10d}".format(train_df.shape[0]))
print("# of rows in the validation data set = {:10d}".format(validate_df.shape[0]))
print("# of rows in the test data set       = {:10d}".format(test_df.shape[0]))

In [None]:
train_df.head()

Let's get the feature dimensions by adding total number of (unique) customers and products

In [None]:
# get feature dimension
all_df = pd.concat([train_df, validate_df, test_df])
nb_customer = np.unique(all_df['customer'].values).shape[0]
nb_products = np.unique(all_df['product'].values).shape[0]
feature_dim = nb_customer + nb_products
print("# of customers = {:10d}".format(nb_customer))
print("# of products  = {:10d}".format(nb_products))
print("# of features  = {:10d}".format(feature_dim))

#### Building Sparse One-Hot Encoded Matrix

Our training matrix is now even sparser: Of all 183,833,321,511 values (1028559 rows * 178729 columns), only 2,057,118 are non-zero (1,028,559*2). In other words, the matrix is 99.99% sparse. Storing this as a dense matrix would be a massive waste of both storage and computing power. To avoid this, use a scipy.lil_matrix sparse matrix for features and a numpy array for ratings.

Let's define a function that takes the data set and returns a sparse feature matrix and numpy array with ratings.

In [None]:
def convert_sparse_matrix(df, nb_rows, nb_customer, nb_products):
    # dataframe to array
    df_val = df.values

    # determine feature size
    nb_cols = nb_customer + nb_products
    print("# of rows = {}".format(str(nb_rows)))
    print("# of cols = {}".format(str(nb_cols)))

    # extract customers and ratings
    df_X = df_val[:, 0:2]
    # Features are one-hot encoded in a sparse matrix
    X = lil_matrix((nb_rows, nb_cols)).astype('float32')
    df_X[:, 1] = nb_customer + df_X[:, 1]
    coords = df_X[:, 0:2]
    X[np.arange(nb_rows), coords[:, 0]] = 1
    X[np.arange(nb_rows), coords[:, 1]] = 1

    # create label with ratings
    Y = df_val[:, 2].astype('float32')

    # validate size and shape
    print(X.shape)
    print(Y.shape)
    assert X.shape == (nb_rows, nb_cols)
    assert Y.shape == (nb_rows, )

    return X, Y

In [None]:
print("Convert training data set to one-hot encoded sparse matrix")
train_X, train_Y = convert_sparse_matrix(train_df, train_df.shape[0], nb_customer, nb_products)
print("Convert validation data set to one-hot encoded sparse matrix")
validate_X, validate_Y = convert_sparse_matrix(validate_df, validate_df.shape[0], nb_customer, nb_products)
print("Convert test data set to one-hot encoded sparse matrix")
test_X, test_Y = convert_sparse_matrix(test_df, test_df.shape[0], nb_customer, nb_products)

#### Convert to Protobuf format and Upload to S3

We will use Sagemaker's utility function [`write_spmatrix_to_sparse_tensor`](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/amazon/common.py) to convert scipy sparse matrix to protobuf format.

In [None]:
def save_as_protobuf(X, Y, bucket, key):
    """Converts features and predictions matrices to recordio protobuf and
       writes to S3

    Args:
        X:
          2D numpy matrix with features
        Y:
          1D numpy matrix with predictions
        bucket:
          s3 bucket where recordio protobuf file will be staged
        prefix:
          s3 url prefix to stage prepared data to use for training the model
        key:
          protobuf file name to be staged

    Returns:
        s3 url with key to the protobuf data
    """
    buf = io.BytesIO()
    smac.write_spmatrix_to_sparse_tensor(buf, X, Y)
    buf.seek(0)
    obj = '{}'.format(key)
    boto3.resource('s3').Bucket(bucket).Object(obj).upload_fileobj(buf)
    return 's3://{}/{}'.format(bucket, obj)

In [None]:
s3_train_path = save_as_protobuf(train_X, train_Y, bucket, 'prepare/train/train.protobuf')
print("Training data set in protobuf format uploaded at {}".format(s3_train_path))
s3_val_path = save_as_protobuf(validate_X, validate_Y, bucket, 'prepare/validate/validate.protobuf')
print("Validation data set in protobuf format uploaded at {}".format(s3_val_path))

We will chunk the test data to avoid the payload size issues when performing batch predictions.

In [None]:
def chunk(x, batch_size):
    """split array into chunks of batch_size
    """
    chunk_range = range(0, x.shape[0], batch_size)
    chunks = [x[p: p + batch_size] for p in chunk_range]
    return chunks

In [None]:
test_x_chunks = chunk(test_X, 10000)
test_y_chunks = chunk(test_Y, 10000)
N = len(test_x_chunks)
for i in range(N):
    test_data = save_as_protobuf(
        test_x_chunks[i],
        test_y_chunks[i],
        bucket,
        "prepare/test/test_" + str(i) + ".protobuf")
    print(test_data)

---

## Model Training

Once we have the data preprocessed and available in the correct format for training, the next step is to actually train the model using the data. We'll use the Amazon SageMaker Python SDK to kick off training and monitor status until it is completed. In this example that takes between 4-7 minutes for 3-10 epochs. 

First, let's get the Sagemaker Factorization Machine container

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'factorization-machines')

Next kick off the base estimator, making sure to pass in the necessary hyperparameters. Notice:

- `feature_dim` is set to 178729, which is the number of customers + products in the training data set.
- `predictor_type` is set to 'regressor' since we are trying to predict the rating
- `mini_batch_size` is set to 200. This value can be tuned for relatively minor improvements in fit and speed, but selecting a reasonable value relative to the dataset is appropriate in most cases.
- `num_factors` is set to 64. Factorization machines find a lower dimensional representation of the interactions for all features. Making this value smaller provides a more parsimonious model, closer to a linear model, but may sacrifice information about interactions. Making it larger provides a higher-dimensional representation of feature interactions, but adds computational complexity and can lead to overfitting. In a practical application, time should be invested to tune this parameter to the appropriate value.

In [None]:
%time

output_location = 's3://{}/train/'.format(bucket)
s3_train_path = 's3://{}/prepare/train/train.protobuf'.format(bucket)
s3_val_path = 's3://{}/prepare/validate/validate.protobuf'.format(bucket)

fm = sagemaker.estimator.Estimator(container,
                                   role, 
                                   train_instance_count=1, 
                                   train_instance_type='ml.c5.4xlarge',
                                   output_path=output_location,
                                   sagemaker_session=sess)

fm.set_hyperparameters(feature_dim=feature_dim,
                      predictor_type='regressor',
                      mini_batch_size=200,
                      num_factors=512,
                      bias_lr=0.02,
                      epochs=10)

In [None]:
fm.fit({'train': s3_train_path,'test': s3_val_path}, wait=False)

Amazon SageMaker built-in algorithms automatically compute and emit a variety of model training, evaluation, and validation metrics that can be captured from Cloudwatch using Sagemaker SDK. Since we are using FM built-in algorithm with predictor type as `regressor`, we can capture RMSE (root-mean-square error) of the model that measures the differences between the predicted values and the actual values.

Let's capture the RMSE of the model

In [None]:
training_job_name = fm._current_job_name
metric_name = 'train:rmse:epoch'

In [None]:
# run this cell to check current status of training job
fm_training_job_result = smclient.describe_training_job(TrainingJobName=training_job_name)

status = fm_training_job_result['TrainingJobStatus']
if status != 'Completed':
    print('Reminder: the training job has not been completed.')
else:
    print('The training job is completed')

In [None]:
# plug-in the training job name and metrics to be captured
metrics_dataframe = TrainingJobAnalytics(training_job_name=training_job_name,metric_names=[metric_name]).dataframe()
metrics_dataframe

In [None]:
plt = metrics_dataframe.plot(kind='line', figsize=(12,5), x='timestamp', y='value', style='b.', legend=False)
plt.set_ylabel(metric_name);

As the number of epochs increased, RMSE goes down which is a good sign that the predicted values are getting closer to the actual values from the test set. We can increase number of epochs or change the hyperparameters and try to tweak the model. Let's try to deploy this model and make predictions to see how close the predictions are. Then we can run a hyper-parameter tuning job to determine the best model.

#### Utility Functions

We will define some common utility functions here that will be used during inference and evaluating results

In [None]:
def convert_to_protobuf(X, Y=None):
    buf = io.BytesIO()
    smac.write_spmatrix_to_sparse_tensor(buf, X, Y)
    buf.seek(0)
    return buf

In [None]:
def convert_sparse_matrix_X(df, nb_rows, nb_customer, nb_products):
    # dataframe to array
    df_val = df.values

    # determine feature size
    nb_cols = nb_customer + nb_products
    
    # extract customers and ratings
    df_X = df_val[:,0:2]
    # Features are one-hot encoded in a sparse matrix
    X = lil_matrix((nb_rows, nb_cols)).astype('float32')
    df_X[:,1] = nb_customer + df_X[:,1]
    coords = df_X[:,0:2]
    X[np.arange(nb_rows), coords[:, 0]] = 1
    X[np.arange(nb_rows), coords[:, 1]] = 1

    # validate size and shape
    assert X.shape == (nb_rows, nb_cols)
    
    return X

---

## Inference 

### Real-Time Inference 

Since the model is trained, all it takes to deploy the model is a Sagemaker API call `deploy()` that creates the model package, sets up endpoint configuration and finally creates the endpoint.

In [None]:
fm_predictor = fm.deploy(instance_type='ml.c4.xlarge', initial_instance_count=1)

Predictions could be done by sending HTTP POST requests from a separate web service, but to keep things easy, we'll just use the `.predict()` method from the SageMaker Python SDK. The API expects JSON or RecordIO format for  request and JSON for response data.

In [None]:
fm_predictor.content_type = 'application/x-recordio-protobuf'

Let's test the model with sample ratings from test data set using `predict()` API call

In [None]:
test_pb = convert_to_protobuf(test_X[1000:1010]).getvalue()

In [None]:
response = fm_predictor.predict(test_pb)
response

In [None]:
predicted = [round(r['score'], 2) for r in json.loads(response)['predictions']]
predicted

In [None]:
results_df = pd.DataFrame(zip(test_Y[1000:1010], predicted), columns = ["actual_rating", "predicted_rating"])
results_df

---

### Batch Inference

Here we will perform batch inference on the test data set prepared earlier (chunking into multiple protobuf files). To run batch transform, create a model package for the transform endpoint 

- Create the model from the training estimator

In [None]:
fm_model = fm.create_model()

- Perform batch inference on the test data set and save results to S3

In [None]:
fm_transformer = fm_model.transformer(
    instance_type='ml.c4.xlarge', 
    instance_count=1, 
    strategy="MultiRecord", 
    output_path="s3://{}/transform/".format(bucket)
)

In [None]:
fm_transformer.transform(
    data="s3://{}/prepare/test/".format(bucket), 
    data_type='S3Prefix', 
    content_type="application/x-recordio-protobuf")

In [None]:
print('Waiting for transform job: ' + fm_transformer.latest_transform_job.job_name)
fm_transformer.wait()

- Inference results will be stored in a separate file for each test file chunk. Let's download the results from S3 and merge them

In [None]:
def download_from_s3(bucket, key):
    s3 = boto3.resource('s3')
    obj = s3.Object( bucket, key)
    content = obj.get()['Body'].read()
    return content

In [None]:
test_preds = []
for i in range(N):
    key = 'transform/test_' + str(i) + '.protobuf.out'
    response = download_from_s3(bucket, key)
    result = [json.loads(row)["score"] for row in response.split("\n") if len(row) > 0]
    test_preds.extend(result)

In [None]:
test_preds = np.array(test_preds)

In [None]:
test_preds.shape

---

## Evaluate Model Performance

Let's start by calculating a naive baseline to approximate how well our model is doing.  The simplest estimate would be to assume every user item rating is just the average rating over all ratings.

*Note, we could do better by using each individual video's average, however, in this case it doesn't really matter as the same conclusions would hold.*

In [None]:
print('Naive MSE:', np.mean((test_df['star_rating'] - np.mean(train_df['star_rating'])) ** 2))

Now, we'll calculate predictions for our test dataset.

In [None]:
print('MSE:', np.mean((test_Y - test_preds) ** 2))

We can see that our neural network and embedding model produces substantially better results (~1.44 vs 1.13 on the mean square error).

For recommender systems, subjective accuracy also matters.  Let's get some recommendations for a random user to see if they make intuitive sense.

In [None]:
df_customer_6 = reduced_df[reduced_df['customer'] == 6].sort_values(['star_rating', 'product'], ascending=[False, True])
pd.concat((df_customer_6.head(10), df_customer_6.tail(10)))

As we can see, user #6 seems to like sprawling dramamtic television series and sci-fi, but they dislike silly comedies.

Now we'll loop through and predict user #6's ratings for every common video in the catalog, to see which ones we'd recommend and which ones we wouldn't.

In [None]:
def create_payload(cust_id, nb_customer, nb_products, product_index):
    # prepare payload for user #6
    c = [cust_id] * nb_products
    p = product_index['product'].values
    x = pd.DataFrame(zip(c,p))
    p_x = convert_sparse_matrix_X(x, x.shape[0], nb_customer, nb_products)
    x_pb = convert_to_protobuf(p_x)
    return x_pb

In [None]:
x_pb = create_payload(6, nb_customer, nb_products, product_index)

In [None]:
# make predictions using end-point created in Real-Time Inference
response = fm_predictor.predict(x_pb)
predictions = [round(r['score'], 2) for r in json.loads(response)['predictions']]

In [None]:
predictions_df = pd.DataFrame({'product': product_index['product'],
                            'prediction': predictions})

In [None]:
df_results_cust_6 = df_customer_6.merge(predictions_df, on=['product'])[['customer', 'customer_id', 'product', 'product_id', 'product_title', 'star_rating', 'prediction']]
df_results_cust_6.sort_values(['prediction', 'product'], ascending=[False, True])

Indeed, our predicted highly rated shows have some well-reviewed TV dramas and some sci-fi.  Meanwhile, our bottom rated shows include goofball comedies.

*Note, because of random initialization in the weights, results on subsequent runs may differ slightly.*

Let's confirm that we no longer have almost perfect correlation in recommendations with user #7.

In [None]:
x_pb = create_payload(7, nb_customer, nb_products, product_index)
response = fm_predictor.predict(x_pb)
predictions_user7 = [round(r['score'], 2) for r in json.loads(response)['predictions']]

In [None]:
plt.scatter(predictions_df['prediction'], np.array(predictions_user7))
plt.show()

---

## Model Tuning

So far, we have developed a deep learning model to predict customer ratings but the model could be improved further by various techniques. In this section, let's see if tuning the hyper-parameters of Factorization Machine is going to make the model any better.

In [None]:
output_location = 's3://{}/train/'.format(bucket)
s3_train_path = 's3://{}/prepare/train/train.protobuf'.format(bucket)
s3_val_path = 's3://{}/prepare/validate/validate.protobuf'.format(bucket)

- Let's create the estimator with Factorization Machines container similar to how we defined in training the model. Also, set the initial hyper-parameters that we know worked before.

In [None]:
fm_estimator = sagemaker.estimator.Estimator(container,
                                   role, 
                                   train_instance_count=1, 
                                   train_instance_type='ml.c5.4xlarge',
                                   output_path=output_location,
                                   sagemaker_session=sess)

In [None]:
fm_estimator.set_hyperparameters(
    feature_dim=feature_dim,
    predictor_type='regressor',
    mini_batch_size=200,
    num_factors=512,
    bias_lr=0.02,
    epochs=20)

- Find best hyperparameters with Sagemaker's Automatic Model Tuning. Following hyperparameters will be tuned
    - ***factors_lr:*** The learning rate for factorization terms.
    - ***factors_init_sigma:*** The standard deviation for initialization of factorization terms. Takes effect if factors_init_method is set to normal.
    

- Define the hyperparameter tuning ranges to be searched and set the objective metric

In [None]:
hyperparameter_ranges=  {
    "factors_lr": ContinuousParameter(0.0001, 0.2),
    "factors_init_sigma": ContinuousParameter(0.0001, 1)
}

- Now that we have our ranges defined we want to define our success metric

In [None]:
objective_metric_name = "test:rmse"
objective_type = "Minimize"

- Start hyperparameter tuning job with the ranges defined

In [None]:
fm_tuner = HyperparameterTuner(
    estimator=fm_estimator,
    objective_metric_name=objective_metric_name, 
    hyperparameter_ranges=hyperparameter_ranges,
    objective_type=objective_type,
    max_jobs=10,
    max_parallel_jobs=2
)

In [None]:
timestamp_prefix = time.strftime("%Y%m%d-%H%M%S", time.gmtime())
fm_tuner_job_name = 'hpo-fm-' + timestamp_prefix

In [None]:
fm_tuner.fit({'train': s3_train_path, 'test': s3_val_path}, job_name=fm_tuner_job_name, wait=False)

- Track hyperparameter tuning job progress

In [None]:
# run this cell to check current status of hyperparameter tuning job
tuning_job_result = smclient.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=fm_tuner_job_name)

status = tuning_job_result['HyperParameterTuningJobStatus']
if status != 'Completed':
    print('Reminder: the tuning job has not been completed.')
    
job_count = tuning_job_result['TrainingJobStatusCounters']['Completed']
print("%d training jobs have completed" % job_count)
    
is_minimize = (tuning_job_result['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['Type'] != 'Maximize')
objective_name = tuning_job_result['HyperParameterTuningJobConfig']['HyperParameterTuningJobObjective']['MetricName']

* Analyze Hyper-Parameter Tuning Job Results

In [None]:
# plug-in the training job name and metrics to be captured
fm_tuner_analytics = HyperparameterTuningJobAnalytics(hyperparameter_tuning_job_name=fm_tuner_job_name)
df_fm_tuner_metrics = fm_tuner_analytics.dataframe()
df_fm_tuner_metrics

In [None]:
# analyze using seaborn
plt = df_fm_tuner_metrics.plot(kind='line', figsize=(12,5), x='TrainingStartTime', 
                             y='FinalObjectiveValue', 
                             style='b.', legend=False)
plt.set_ylabel(objective_metric_name);

- Best Factorization Machine Model after Hyper-Parameter Optimization

In [None]:
print("fm_tuner_job_name: " + fm_tuner_job_name)
fm_tuner = HyperparameterTuner.attach(fm_tuner_job_name)

fm_tuner_analytics = HyperparameterTuningJobAnalytics(hyperparameter_tuning_job_name=fm_tuner_job_name)
df_fm_tuner_metrics = fm_tuner_analytics.dataframe()

fm_best_model_name = fm_tuner.best_training_job()
print("fm_best_model_name: " + fm_best_model_name)

fm_model_info = smclient.describe_training_job(TrainingJobName=fm_best_model_name)

In [None]:
df_fm_tuner_metrics[df_fm_tuner_metrics['TrainingJobName']==fm_best_model_name]

- Let's evaluate the results with the best training job from hyper-parameter tuning job.

We can deploy the endpoint using hyper-parameter tuning job and test the predictions. 

In [None]:
fm = sagemaker.estimator.Estimator.attach(fm_best_model_name)

We can re-run the cells in Batch Inference and Evaluation section to evaluate the performance of the model with tuned hyper-parameters. 

Assuming batch inference is carried out, let's calculate predictions for our test dataset and see if we do better than the training job with default hyper-parameters.

In [None]:
print('MSE:', np.mean((test_Y - test_preds) ** 2))

---

## Wrap-up

In this example, we developed a deep learning model to predict customer ratings.  This could serve as the foundation of a recommender system in a variety of use cases.  However, there are many ways in which it could be improved.  For example we did very little with:
- hyperparameter tuning
- controlling for overfitting (early stopping, dropout, etc.)
- testing whether binarizing our target variable would improve results
- including other information sources (video genres, historical ratings, time of review)
- adjusting our threshold for user and item inclusion 

In addition to improving the model, we could improve the engineering by:
- Setting the context and key value store up for distributed training
- Fine tuning our data ingestion (e.g. num_workers on our data iterators) to ensure we're fully utilizing our GPU
- Thinking about how pre-processing would need to change as datasets scale beyond a single machine

Beyond that, recommenders are a very active area of research and techniques from active learning, reinforcement learning, segmentation, ensembling, and more should be investigated to deliver well-rounded recommendations.

### Clean-up (optional)

Let's finish by deleting our endpoint to avoid stray hosting charges.

In [None]:
endpoint_name_contains = ['-fm-', 'factorization-machines-']
for name in endpoint_name_contains:
    endpoints = smclient.list_endpoints(NameContains=name, StatusEquals='InService')
    endpoint_names = [r['EndpointName'] for r in endpoints['Endpoints']]
    for endpoint_name in endpoint_names:
        print("Deleting endpoint: " + endpoint_name)
        smclient.delete_endpoint(EndpointName=endpoint_name)

---