## Importing Libraries

In [6]:
import pandas as pd
import numpy as np
import sagemaker
import boto3,io,json
from sklearn.model_selection import train_test_split
import sagemaker.amazon.common as smac
from sagemaker import get_execution_role
from sagemaker.predictor import json_deserializer
from scipy.sparse import lil_matrix
from sagemaker.amazon.amazon_estimator import get_image_uri
sage_client = boto3.Session().client('sagemaker')

## Downloading dataset from S3 bucket

In [17]:
!aws s3 cp s3://movies-mlready-bucket/run-1607268992801-part-r-00000.csv .

download: s3://movies-mlready-bucket/run-1607268992801-part-r-00000.csv to ./run-1607268992801-part-r-00000.csv


## Reading the CSV file into dataframe, and converting date type to integer

In [2]:
df = pd.read_csv('run-1607268992801-part-r-00000.csv')

In [3]:
df = df.drop(df[df.userid == 'userId'].index)

In [4]:
df.userid = df.userid.astype(int)
df.movieid = df.movieid.astype(int)
df.rating = df.rating.astype(float)
df.movieid = df.movieid.astype(int)

In [5]:
df = df.drop(['timestamp'], axis=1)

In [19]:
train, test = train_test_split(df, test_size=0.33,random_state=42)

In [22]:
nb_users= train['userid'].max()
nb_movies=train['movieid'].max()
nb_features=nb_users+nb_movies
nb_ratings_train=len(train.index)
nb_ratings_test=len(test.index)
print (" # of users: ", nb_users)
print (" # of movies: ", nb_movies)
print (" Training Count: ", nb_ratings_train)
print (" Testing Count: ", nb_ratings_test)
print (" Features (# of users + # of movies): ", nb_features)

 # of users:  610
 # of movies:  193609
 Training Count:  67560
 Testing Count:  33276
 Features (# of users + # of movies):  194219


## Helper function to create LIL Matrix with users, movies, and label vector. Only with ratings more than 4 were create labels

In [23]:
def loadDataset(df, lines, columns):
    # Features are one-hot encoded in a sparse matrix
    X = lil_matrix((lines, columns)).astype('float32')
    # Labels are stored in a vector
    Y = []
    line=0
    for index, row in df.iterrows():
            X[line,row['userid']-1] = 1
            X[line, nb_users+(row['movieid']-1)] = 1
            if int(row['rating']) >= 4:
                Y.append(1)
            else:
                Y.append(0)
            line=line+1

    Y=np.array(Y).astype('float32')            
    return X,Y


X_train, Y_train = loadDataset(train, nb_ratings_train, nb_features)
X_test, Y_test = loadDataset(test, nb_ratings_test, nb_features)

In [24]:
print(X_train.shape)
print(Y_train.shape)
assert X_train.shape == (nb_ratings_train, nb_features)
assert Y_train.shape == (nb_ratings_train, )
zero_labels = np.count_nonzero(Y_train)
print("Training labels: %d zeros, %d ones" % (zero_labels, nb_ratings_train-zero_labels))

print(X_test.shape)
print(Y_test.shape)
assert X_test.shape  == (nb_ratings_test, nb_features)
assert Y_test.shape  == (nb_ratings_test, )
zero_labels = np.count_nonzero(Y_test)
print("Test labels: %d zeros, %d ones" % (zero_labels, nb_ratings_test-zero_labels))

(67560, 194219)
(67560,)
Training labels: 66627 zeros, 933 ones
(33276, 194219)
(33276,)
Test labels: 32839 zeros, 437 ones


## Data repositories for input/output & model artifacts

In [25]:
#Change this value to your own bucket name
bucket = 'movies-mlready-bucket'
prefix = 'FM-classifier'

train_key      = 'train.protobuf'
train_prefix   = '{}/{}'.format(prefix, 'train')

test_key       = 'test.protobuf'
test_prefix    = '{}/{}'.format(prefix, 'test')


output_prefix  = 's3://{}/{}/output'.format(bucket, prefix)

## Helper function to convert the training and test data into record-io

In [26]:
def writeDatasetToProtobuf(X, bucket, prefix, key, d_type, Y=None):
    buf = io.BytesIO()
    if d_type == "sparse":
        smac.write_spmatrix_to_sparse_tensor(buf, X, labels=Y)
    else:
        smac.write_numpy_to_dense_tensor(buf, X, labels=Y)
        
    buf.seek(0)
    obj = '{}/{}'.format(prefix, key)
    boto3.resource('s3').Bucket(bucket).Object(obj).upload_fileobj(buf)
    return 's3://{}/{}'.format(bucket,obj)
    
fm_classifier_train_data_path = writeDatasetToProtobuf(X_train, bucket, train_prefix, train_key, "sparse", Y_train)
fm_classifier_test_data_path  = writeDatasetToProtobuf(X_test, bucket, test_prefix, test_key, "sparse", Y_test)

  
print ("Training data S3 path: ",fm_train_data_path)
print ("Test data S3 path: ",fm_test_data_path)
print ("FM model output S3 path: {}".format(output_prefix))

Training data S3 path:  s3://movies-mlready-bucket/fm/train/train.protobuf
Test data S3 path:  s3://movies-mlready-bucket/fm/test/test.protobuf
FM model output S3 path: s3://movies-mlready-bucket/fm/output


## Create a Factorization Machines estimator

In [27]:
instance_type='ml.m5.large'
role = 'arn:aws:iam::719009365707:role/role_sagemaker'
fm_classifier = sagemaker.estimator.Estimator(get_image_uri(boto3.Session().region_name, "factorization-machines"),
                                   role, 
                                   train_instance_count=1, 
                                   train_instance_type=instance_type,
                                   base_job_name='FM-classifier',
                                   output_path=output_prefix,
                                   sagemaker_session=sagemaker.Session())

fm_classifier.set_hyperparameters(feature_dim=nb_features,
                      predictor_type='binary_classifier',
                      mini_batch_size=1000,
                      num_factors=64,
                      epochs=200)

## HPO Tuner and objective metric name

In [28]:
hyperparameter_ranges = {'bias_init_sigma': ContinuousParameter(1e-8, 32),
                         'bias_lr': ContinuousParameter(1e-8,32),
                         'bias_wd': ContinuousParameter(1e-8,32),
                         'linear_init_sigma': ContinuousParameter(1e-8, 32),
                         'linear_lr': ContinuousParameter(1e-8,32),
                         'linear_wd': ContinuousParameter(1e-8,32),
                         'factors_init_sigma': ContinuousParameter(1e-8, 32),
                         'factors_lr': ContinuousParameter(1e-8,32),
                         'factors_wd': ContinuousParameter(1e-8,32)}
                    

## Objective Metric Name = Binary classification accuracy

In [29]:
objective_metric_name = 'test:binary_classification_accuracy'

## HPO Tuner Initialization and fitting the tuner

In [31]:
tuner_classifier = HyperparameterTuner(fm,
                            objective_metric_name,
                            hyperparameter_ranges,
                            base_tuning_job_name='FM-classifier',
                            max_jobs=5,
                            max_parallel_jobs=3,
                            early_stopping_type='Auto')

In [32]:
tuner_classifier.fit({'train': fm_classifier_train_data_path, 'test': fm_classifier_test_data_path})

## Creating an endpoint for Predictions

In [33]:
fm_classifier_pred = tuner_classifier.deploy(instance_type='ml.m5.large', 
                            initial_instance_count=1,)

2020-12-22 20:55:25 Starting - Preparing the instances for training
2020-12-22 20:55:25 Downloading - Downloading input data
2020-12-22 20:55:25 Training - Training image download completed. Training in progress.
2020-12-22 20:55:25 Uploading - Uploading generated training model
2020-12-22 20:55:25 Completed - Training job completed[34mDocker entrypoint called with argument(s): train[0m
[34mRunning default environment configuration script[0m
  from numpy.testing import nosetester[0m
[34m[12/22/2020 20:52:30 INFO 140608586069824] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/algorithm/resources/default-conf.json: {u'factors_lr': u'0.0001', u'linear_init_sigma': u'0.01', u'epochs': 1, u'_wd': u'1.0', u'_num_kv_servers': u'auto', u'use_bias': u'true', u'factors_init_sigma': u'0.001', u'_log_level': u'info', u'bias_init_method': u'normal', u'linear_init_method': u'normal', u'linear_lr': u'0.001', u'factors_init_method': u'normal', u'_tuning_objective_metr

[34m#metrics {"Metrics": {"Max Batches Seen Between Resets": {"count": 1, "max": 68, "sum": 68.0, "min": 68}, "Number of Batches Since Last Reset": {"count": 1, "max": 68, "sum": 68.0, "min": 68}, "Number of Records Since Last Reset": {"count": 1, "max": 67560, "sum": 67560.0, "min": 67560}, "Total Batches Seen": {"count": 1, "max": 12513, "sum": 12513.0, "min": 12513}, "Total Records Seen": {"count": 1, "max": 12432040, "sum": 12432040.0, "min": 12432040}, "Max Records Seen Between Resets": {"count": 1, "max": 67560, "sum": 67560.0, "min": 67560}, "Reset Count": {"count": 1, "max": 185, "sum": 185.0, "min": 185}}, "EndTime": 1608670494.109703, "Dimensions": {"Host": "algo-1", "Meta": "training_data_iter", "Operation": "training", "Algorithm": "factorization-machines", "epoch": 183}, "StartTime": 1608670493.349659}
[0m
[34m[12/22/2020 20:54:54 INFO 140608586069824] #throughput_metric: host=algo-1, train throughput=88875.9181845 records/second[0m
[34m[12/22/2020 20:54:54 INFO 14060

Training seconds: 228
Billable seconds: 228
---------------!

## Helper function to serialize Inference requests

In [34]:
def fm_serializer(data):
    js = {"instances": []}
    for row in data:
        js["instances"].append({"features": row.tolist()})
    return json.dumps(js).encode()


fm_classifier_pred.serializer = fm_serializer
fm_classifier_pred.deserializer = json_deserializer

## Making Predictions

In [35]:
fm_classifier_result = fm_classifier_pred.predict(X_test[1002:1009].toarray())


In [36]:
predictions = pd.DataFrame(fm_classifier_result)

{'predictions': [{'predicted_label': 1.0, 'score': 0.9017724394798279}, {'predicted_label': 1.0, 'score': 0.9028304219245911}, {'predicted_label': 1.0, 'score': 0.902442455291748}, {'predicted_label': 1.0, 'score': 0.9020636081695557}, {'predicted_label': 1.0, 'score': 0.9020634293556213}, {'predicted_label': 1.0, 'score': 0.9030724167823792}, {'predicted_label': 1.0, 'score': 0.901659369468689}]}


In [37]:
Actuals = pd.DataFrame(Y_test[1002:1009])

[1. 1. 1. 1. 1. 1. 1.]
