# Recommender by ALS

With Collaborative filtering, we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B’s opinion on a different issue x than to have the opinion on x of a user-chosen randomly.

At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user’s rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user.

Matrix factorization: is a class of collaborative filtering algorithms used in recommender systems. Matrix factorization algorithms work by decomposing the user-item interaction matrix into the product of two lower dimensionality rectangular matrices.

Alternating least square(ALS) matrix factorization: The idea is basically to take a large (or potentially huge) matrix and factor it into some smaller representation of the original matrix through alternating least squares. We end up with two or more lower dimensional matrices whose product equals the original one.ALS comes inbuilt in Apache Spark.


In [1]:
sc

In [2]:
from os import path

ROOT_DIR = "./"
DATA_DIR = path.join(ROOT_DIR, 'data')
MODEL_DIR = path.join(ROOT_DIR, 'model')

In [None]:
"""
Utilities definition
"""

import hashlib

class Utils:
    
    '''
    TODO Transform the ID to unique integer
    '''
    @staticmethod
    def hashToInt(s):
        return int(hashlib.sha1(s).hexdigest(), 16) % (10 ** 8)

Test
Utils.hashToInt(u'A141HP4LYPWMSR')

## 1. Data preprocessing

In [4]:
""" 
Read ratings data from file to dataframe
"""

path = os.path.join(DATA_DIR, 'ratings')
data = spark.read.json(path)
data.show()
# data.count()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:40853)
Traceback (most recent call last):
  File "/usr/lib/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:40853)

In [5]:
data_ = data.rdd \
    .map(lambda ( asin, overall, reviewerID): (asin, Utils.hashToInt(asin), overall, reviewerID, Utils.hashToInt(reviewerID))) \
    .toDF()\
    .withColumnRenamed("_1", "asin") \
    .withColumnRenamed("_2", "asin_int") \
    .withColumnRenamed("_3", "score") \
    .withColumnRenamed("_4", "reviewerID") \
    .withColumnRenamed("_5", "reviewerID_int")
data_.show()

SyntaxError: invalid syntax (<ipython-input-5-6cf3ba68faa0>, line 2)

In [7]:
"""

Store userId in Int and movieIn in Int data 
    corresponding to their origin for later usage
    
"""

users = data_.select("asin","asin_int").dropDuplicates()
movies = data_.select("reviewerID","reviewerID_int").dropDuplicates()

user_path = os.path.join(DATA_DIR, 'users')
movie_path = os.path.join(DATA_DIR,'movies')

users.write.json(user_path, mode='overwrite', compression='gzip')
movies.write.json(movie_path, mode='overwrite', compression='gzip')

In [8]:
# Test 
users.show()

+----------+--------+
|      asin|asin_int|
+----------+--------+
|078322947X|58326392|
|0962870080|  734429|
|1573411205|96591480|
|1578071976|81174131|
|1587270811|44613020|
|1933424478|85302331|
|6300213900|36094112|
|6301115880|10482173|
|6301334523|95755197|
|6301651847|37079934|
|6301752694|69284628|
|6301928423|75403443|
|6302091071|66425136|
|6302148332|53442745|
|6302120616|41664247|
|6302181755|71163828|
|6302641934|14528673|
|630301402X|28613535|
|6303038913|94364605|
|6303017991|47660785|
+----------+--------+
only showing top 20 rows



In [10]:
"""

Split dataset to trainset, testset
    ` Trainset: 80% random records
    ` Testset: 20% remained
    
X_train: movie and user of each review from Trainset
X_test: movie and user of each review from Testset

"""

(train_data, test_data) = data_.randomSplit([0.8, 0.2])
train_data.count(), test_data.count()
# X_train = train_data.map(lambda (u, p, r): (int(p), int(u)))

# X_test = test_data.map(lambda (p, u, r): (int(i), int(u)))


(7012597, 1752971)

## 2. Buiding Recommeder using matrix factorization method

In [6]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [19]:
"""

Recommendation model using ALS
    ` rank = 10
    ` number of iteration = 10 

"""

# Rating constructor: Rating(int user, int product, double rating) 

rank = 10
numIterations = 10

ratings = train_data.select("reviewerID_int", "asin_int", "score") \
    .rdd\
    .map(lambda (u,p,r): Rating(u,p,r)) \

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics

model = ALS.train(ratings, rank, numIterations)

#Test model
model.recommendProducts(51442077, 5)

[Rating(user=51442077, product=61910371, rating=6.484451159851169),
 Rating(user=51442077, product=81170012, rating=6.421338327716319),
 Rating(user=51442077, product=28926953, rating=6.31559940767683),
 Rating(user=51442077, product=39736482, rating=6.251431946757155),
 Rating(user=51442077, product=90061849, rating=6.207537937659621)]

## 3. Model Evaluation 

In this part we evaluate the recommendation model on both train dataset and test dataset. 
The RMSE lost function will be used.


In [20]:
from pyspark.mllib.evaluation import RegressionMetrics

In [21]:
# Evaluate the model on training data

X_train = train_data.sellect("reviewerID_int", "asin_int").rdd
predictions = model.predictAll(X_train).map(lambda r: ((r[0], r[1]), r[2]))

labelsAndPreds = train_data.map(lambda (p, u, r): ((p,u),r)) \
                                .join(predictions).map(lambda (k, v): v)


metrics = RegressionMetrics(labelsAndPreds)

rmse = metrics.rootMeanSquaredError

print("Root Mean Squared Error = " + str(rmse))

NameError: name 'X_train' is not defined

In [29]:
# Evaluate the model on test data

test_preds = model.predictAll(X_test).map(lambda r: ((r[0], r[1]), r[2]))

test_labelsAndPreds = test_data.map(lambda (p, u, r): ((p,u),r)) \
                                .join(test_preds).map(lambda (k, v): v) \

test_metrics = RegressionMetrics(test_labelsAndPreds)

test_rmse = test_metrics.rootMeanSquaredError

print("Root Mean Squared Error = " + str(test_rmse))

Root Mean Squared Error = 1.08251940251


In [None]:
"""
===> Need to try other hyper parameters to find the best model 
"""

In [39]:
"""
Save the model 
    ` Clean the directory before write
    
"""

import os
import sys

MODEL_DIR = os.path.join(ROOT_DIR, 'model', 'als')

!rm -r {MODEL_DIR + "/*"}

try:
    model.save(sc, MODEL_DIR)
    print ("Save model successfully at ", MODEL_DIR)
except:
    sys.exc_info()

Save model successfully
