- https://medium.com/@patelneha1495/recommendation-system-in-python-using-als-algorithm-and-apache-spark-27aca08eaab3
- https://towardsdatascience.com/prototyping-a-recommender-system-step-by-step-part-2-alternating-least-square-als-matrix-4a76c58714a1

## Data format
- Format is one-review-per-line in json. See examples below for further help reading the data.

    - reviewerID - ID of the reviewer, e.g. A2SUAM1J3GNN3B
    - asin - ID of the product, e.g. 0000013714
    - reviewerName - name of the reviewer
    - vote - helpful votes of the review
    - style - a disctionary of the product metadata, e.g., "Format" is "Hardcover"
    - reviewText - text of the review
    - overall - rating of the product
    - summary - summary of the review
    - unixReviewTime - time of the review (unix time)
    - reviewTime - time of the review (raw)
    - image - images that users post after they have received the produc

In [1]:
#### To measure all running time
# https://github.com/cpcloud/ipython-autotime

%load_ext autotime

In [2]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.ml.recommendation import ALS
from pyspark.mllib.recommendation import ALS

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

import json

%matplotlib inline

time: 958 ms


In [3]:
# spark config
spark = SparkSession \
    .builder \
    .appName("movie recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# spark = SparkSession \
#     .builder \
#     .appName("movie recommendation") \
#     .config("spark.driver.maxResultSize", "8g") \
#     .config("spark.driver.memory", "8g") \
#     .config("spark.executor.memory", "8g") \
#     .config("spark.master", "local[12]") \
#     .getOrCreate()

# get spark context
sc = spark.sparkContext

time: 2.98 s


- Download dataset from: http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Clothing_Shoes_and_Jewelry.json.gz

In [4]:
DATA_PATH = '../../Data_fulldata/Review/ClothingShoesAndJewelry/'
REVIEW_DATA = 'Clothing_Shoes_and_Jewelry.json.gz'

time: 530 µs


1. Please unzip Clothing_Shoes_and_Jewelry.json.gz to Clothing_Shoes_and_Jewelry.json
2. Load Clothing_Shoes_and_Jewelry.json (14.1 GB (14,144,939,923 bytes))

In [5]:
ratings = spark.read.load(DATA_PATH+REVIEW_DATA, format='json', header=True, inferSchema=True)

time: 2min 8s


In [6]:
ratings.show(3)

+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID| reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
|0871167042| null|    5.0|This book has bea...| 05 4, 2014|A2IC3NZN488KWK|   Ruby Tulip|[,,,,,,,,  Paperb...|      Unique designs|    1399161600|    true|   2|
|0871167042| null|    4.0|I love the ideas ...|04 26, 2014|A3OT9BYASFGU2X|    Laurie K.|[,,,,,,,,  Paperb...|makes you want to...|    1398470400|    true|null|
|0871167042| null|    5.0|As someone who ha...|04 17, 2014|A28GK1G2KDXHRP|Marie Rhoades|[,,,,,,,,  Paperb...|Highly Recommend ...|    1397692800|   false|   6|
+----------+-----+-------+--------------

In [7]:
type(ratings)

pyspark.sql.dataframe.DataFrame

time: 4.11 ms


In [8]:
print("Shape of Data", (ratings.count(), len(ratings.columns)))

Shape of Data (32292099, 12)
time: 1min 18s


## Drop and Clean data
    - Drop null in Vote

In [9]:
clean_ratings = ratings.na.drop(how='any', subset='vote')

time: 10.9 ms


In [10]:
print("Shape of Data", (clean_ratings.count(), len(clean_ratings.columns)))

Shape of Data (2886813, 12)
time: 1min 23s


### Spark SQL and OLAP



- What is the total number of review in the data sets?

In [11]:
tmp = clean_ratings.count()
print('We have a total of {} review in the data sets'.format(tmp))

We have a total of 2886813 review in the data sets
time: 1min 27s


- What are the overall?
    - overall - rating of the product

In [12]:
print('Distinct values of overall:')
print(sorted(clean_ratings.select('overall').distinct().rdd.map(lambda r: r[0]).collect()))

Distinct values of overall:
[1.0, 2.0, 3.0, 4.0, 5.0]
time: 1min 27s


- What are the vote?
    - vote - helpful votes of the review

In [13]:
print('Distinct values of vote:')
print(sorted(clean_ratings.select('vote').distinct().rdd.map(lambda r: r[0]).collect()))

Distinct values of vote:
['1,000', '1,004', '1,008', '1,009', '1,012', '1,014', '1,024', '1,038', '1,052', '1,062', '1,066', '1,077', '1,091', '1,092', '1,115', '1,118', '1,125', '1,134', '1,151', '1,195', '1,199', '1,204', '1,223', '1,232', '1,241', '1,250', '1,272', '1,322', '1,332', '1,333', '1,354', '1,359', '1,376', '1,377', '1,409', '1,449', '1,464', '1,488', '1,494', '1,506', '1,556', '1,652', '1,693', '1,706', '1,790', '1,858', '1,861', '1,895', '1,922', '1,926', '1,939', '1,942', '1,960', '10', '100', '101', '102', '103', '104', '105', '106', '107', '108', '109', '11', '11,445', '11,578', '110', '111', '112', '113', '114', '115', '116', '117', '118', '119', '12', '120', '121', '122', '123', '124', '125', '126', '127', '128', '129', '13', '130', '131', '132', '133', '134', '135', '136', '137', '138', '139', '14', '140', '141', '142', '143', '144', '145', '146', '147', '148', '149', '15', '150', '151', '152', '153', '154', '155', '156', '157', '158', '159', '16', '160', '161', '

- What is minimum number of ratings per user and minimum number of ratings per product?

In [14]:
clean_ratings.show(2)

+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID| reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
|0871167042| null|    5.0|This book has bea...| 05 4, 2014|A2IC3NZN488KWK|   Ruby Tulip|[,,,,,,,,  Paperb...|      Unique designs|    1399161600|    true|   2|
|0871167042| null|    5.0|As someone who ha...|04 17, 2014|A28GK1G2KDXHRP|Marie Rhoades|[,,,,,,,,  Paperb...|Highly Recommend ...|    1397692800|   false|   6|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
only showing top 2 rows

time: 167 ms


In [15]:
tmp1 = clean_ratings.groupBy("reviewerID").count().toPandas()['count'].min()
tmp2 = clean_ratings.groupBy("asin").count().toPandas()['count'].min()
print('For the users that rated product and the product that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per product is {}'.format(tmp2))

For the users that rated product and the product that were rated:
Minimum number of ratings per user is 1
Minimum number of ratings per product is 1
time: 3min 3s


- What is 
1. maximun number of ratings per user 
2. maximun number of ratings per product
3. maximun number of vote per product?

In [16]:
tmp1 = clean_ratings.groupBy("reviewerID").count().toPandas()['count'].max()
tmp2 = clean_ratings.groupBy("asin").count().toPandas()['count'].max()
tmp3 = clean_ratings.groupBy("vote").count().toPandas()['count'].max()
print('For the users that rated product and the product that were rated:')
print('Maximun number of ratings per user is {}'.format(tmp1))
print('Maximun number of ratings per product is {}'.format(tmp2))
print('Maximun number of ratings per vote is {}'.format(tmp3))

For the users that rated product and the product that were rated:
Maximun number of ratings per user is 186
Maximun number of ratings per product is 2493
Maximun number of ratings per vote is 1186954
time: 4min 43s


- How many products are rated by only one user?

In [17]:
tmp1 = sum(clean_ratings.groupBy("asin").count().toPandas()['count'] == 1)
tmp2 = clean_ratings.select('asin').distinct().count()
print('{} out of {} product are rated by only one user'.format(tmp1, tmp2))

290806 out of 631570 product are rated by only one user
time: 3min 23s


- What is the total number of users in the data sets?

In [18]:
tmp = clean_ratings.select('asin').distinct().count()
print('We have a total of {} distinct product in the data sets'.format(tmp))

We have a total of 631570 distinct product in the data sets
time: 1min 42s


#### Style Column - How we can use this one? How I can get data from "Format"?
- Parsing Style column

    - Row(Capacity:=None, Color Name:=None, Color:=None, Design:=None, Diameter:=None, Edition:=None, Flavor Name:=None, Flavor:=None, Format:=' Paperback', Gem Type:=None, Grip Type:=None, Initial:=None, Item Display Length:=None, Item Package Quantity:=None, Length:=None, Material Type:=None, Material:=None, Metal Stamp:=None, Metal Type:=None, Model Number:=None, Number of Items:=None, Offer Type:=None, Package Quantity:=None, Package Type:=None, Pattern:=None, Primary Stone Gem Type:=None, Product Packaging:=None, Scent Name:=None, Shape:=None, Size Name:=None, Size per Pearl:=None, Size:=None, Stone Shape:=None, Style Name:=None, Style:=None, Team Name:=None, Total Diamond Weight:=None, Width:=None)

    - Row(Capacity:=None, Color Name:=None, Color:=None, Design:=None, Diameter:=None, Edition:=None, Flavor Name:=None, Flavor:=None, Format:=' Kindle Edition', Gem Type:=None, Grip Type:=None, Initial:=None, Item Display Length:=None, Item Package Quantity:=None, Length:=None, Material Type:=None, Material:=None, Metal Stamp:=None, Metal Type:=None, Model Number:=None, Number of Items:=None, Offer Type:=None, Package Quantity:=None, Package Type:=None, Pattern:=None, Primary Stone Gem Type:=None, Product Packaging:=None, Scent Name:=None, Shape:=None, Size Name:=None, Size per Pearl:=None, Size:=None, Stone Shape:=None, Style Name:=None, Style:=None, Team Name:=None, Total Diamond Weight:=None, Width:=None)


In [19]:
clean_ratings.select('style').toPandas().head(n=10).to_csv('test.csv')

time: 2min 7s


In [20]:
clean_ratings.show(2)

+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID| reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
|0871167042| null|    5.0|This book has bea...| 05 4, 2014|A2IC3NZN488KWK|   Ruby Tulip|[,,,,,,,,  Paperb...|      Unique designs|    1399161600|    true|   2|
|0871167042| null|    5.0|As someone who ha...|04 17, 2014|A28GK1G2KDXHRP|Marie Rhoades|[,,,,,,,,  Paperb...|Highly Recommend ...|    1397692800|   false|   6|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+
only showing top 2 rows

time: 67.2 ms


# Recommendation system in python using ALS algorithm and Apache Spark

### Terminologies:
There are certain terminologies which needs to be understood before moving forward.
1. Apache Spark: Apache Spark is an open-source distributed general-purpose cluster-computing framework.It can be used with Hadoop too.
2. Collaborative filtering: Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users. Consider example if a person A likes item 1, 2, 3 and B like 2,3,4 then they have similar interests and A should like item 4 and B should like item 1.
3. Alternating least square(ALS) matrix factorization: The idea is basically to take a large (or potentially huge) matrix and factor it into some smaller representation of the original matrix through alternating least squares. We end up with two or more lower dimensional matrices whose product equals the original one.ALS comes inbuilt in Apache Spark.
4. PySpark: PySpark is the collaboration of Apache Spark and Python. PySpark is the Python API for Spark.


In [21]:
# load data
df = spark.read.json(DATA_PATH+REVIEW_DATA)
df.show(50,truncate=True)

+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|0871167042| null|    5.0|This book has bea...| 05 4, 2014|A2IC3NZN488KWK|          Ruby Tulip|[,,,,,,,,  Paperb...|      Unique designs|    1399161600|    true|   2|
|0871167042| null|    4.0|I love the ideas ...|04 26, 2014|A3OT9BYASFGU2X|           Laurie K.|[,,,,,,,,  Paperb...|makes you want to...|    1398470400|    true|null|
|0871167042| null|    5.0|As someone who ha...|04 17, 2014|A28GK1G2KDXHRP|       Marie Rhoades|[,,,,,,,,  Paperb...|Highly Recommend ...|    1397692800|   false|   6

- Select appropriate columns

In [22]:
nd=df.select(df['asin'],df['overall'],df['reviewerID'])
nd.show()

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|0871167042|    5.0|A2IC3NZN488KWK|
|0871167042|    4.0|A3OT9BYASFGU2X|
|0871167042|    5.0|A28GK1G2KDXHRP|
|0871167042|    5.0|A3NFXFEKW8OK0E|
|0871167042|    5.0|A3I6G5TKBVJEK9|
|0871167042|    5.0|A1A7Y1M8AJWNZ8|
|0871167042|    5.0|A30FG02C424EJ5|
|0871167042|    5.0| ADQQYU1UCDEWB|
|0871167042|    5.0|A39YL2NXZORK56|
|0871167042|    5.0|A2PRY50ZESF1MH|
|0871167042|    5.0|A2G9GWQEWWNQUB|
|0871167042|    4.0|A3RGH15H17SM1Z|
|0871167042|    3.0|A20QJNRKLJVP1E|
|0871167042|    5.0|A1G26EYQGW3YF1|
|0871167042|    4.0|A2JGAZF2Y2BDU6|
|0871167042|    5.0|A3NI5OGW35SLY2|
|0871167042|    5.0|A1OPRA4NE56EV6|
|0871167042|    4.0|A3M6UXIK7XTA7A|
|0871167042|    5.0|A3I3B5OSB80ZXC|
|0871167042|    5.0| A62O7C5RQB353|
+----------+-------+--------------+
only showing top 20 rows

time: 72 ms


- Converting String to index

Before making an ALS model it needs to be clear that ALS only accepts integer value as parameters. Hence we need to convert asin and reviewerID column in index form.

In [23]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(nd.columns)-set(['overall'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(nd).transform(nd)
transformed.show()

+----------+-------+--------------+----------+----------------+
|      asin|overall|    reviewerID|asin_index|reviewerID_index|
+----------+-------+--------------+----------+----------------+
|0871167042|    5.0|A2IC3NZN488KWK|  122222.0|        157794.0|
|0871167042|    4.0|A3OT9BYASFGU2X|  122222.0|       6460415.0|
|0871167042|    5.0|A28GK1G2KDXHRP|  122222.0|       7128913.0|
|0871167042|    5.0|A3NFXFEKW8OK0E|  122222.0|       8584184.0|
|0871167042|    5.0|A3I6G5TKBVJEK9|  122222.0|       7606980.0|
|0871167042|    5.0|A1A7Y1M8AJWNZ8|  122222.0|       5345257.0|
|0871167042|    5.0|A30FG02C424EJ5|  122222.0|       1004286.0|
|0871167042|    5.0| ADQQYU1UCDEWB|  122222.0|       3077653.0|
|0871167042|    5.0|A39YL2NXZORK56|  122222.0|       8082569.0|
|0871167042|    5.0|A2PRY50ZESF1MH|  122222.0|       6020161.0|
|0871167042|    5.0|A2G9GWQEWWNQUB|  122222.0|        439771.0|
|0871167042|    4.0|A3RGH15H17SM1Z|  122222.0|       5094237.0|
|0871167042|    3.0|A20QJNRKLJVP1E|  122

- Creating training and test data

In [24]:
# (training,test)=transformed.randomSplit([0.8, 0.2])

time: 188 µs


In [25]:
train, validation, test = transformed.randomSplit([0.6, 0.2, 0.2], seed=99)

# cache data
train.cache()
validation.cache()
test.cache()

DataFrame[asin: string, overall: double, reviewerID: string, asin_index: double, reviewerID_index: double]

time: 12.1 s


- Creating ALS model and fitting data

In [26]:
# als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="reviewerID_index",itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)
# model=als.fit(training)

time: 266 µs


- hyper-params in Alternating Least Square (ALS):
    - num_iters: the maximum number of iterations to run (defaults to 10)

    - ranks: the number of latent factors in the model (defaults to 10)
        - the number of latent factors can be tuned via cross-validation. 
        - **Latent factors are the features in the lower dimension latent space projected from user-item interaction matrix.**
        - The idea behind matrix factorization is to use latent factors to represent user preferences or movie topics in a much lower dimension space. 
        - Matrix factorization is one of very effective dimension reduction techniques in machine learning.
    - reg_param: the regularization parameter in ALS (defaults to 1.0)
        - A common strategy to avoid overfitting is to add regularization terms to the objective function.
        - Its objective function is slightly different than Funk SVD: ALS uses L2 regularization while Funk uses L1 regularization
        

In [27]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    grid search function to select the best model based on RMSE of
    validation data
    Parameters
    ----------
    train_data: spark DF with columns ['asin', 'overall', 'reviewerID']
    
    validation_data: spark DF with columns ['asin', 'overall', 'reviewerID']
    
    num_iters: int, max number of learning iterations
    
    reg_param: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with lowest RMSE score on validation data
    """
    
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
                
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

time: 1.86 ms


In [28]:
# hyper-param config
num_iterations = 10
ranks = [8, 10, 12, 14, 16, 18, 20]
reg_params = [0.001, 0.01, 0.05, 0.1, 0.2]

# grid search and select best model
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 44972)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/jh/anaconda3/envs/movie_recomm/lib/python3.7/site-packages/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jh/anaconda3/envs/movie_recomm/lib/python3.7/site-packages/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/home/jh/anaconda3/envs/movie_recomm/lib/python3.7/site-packages/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "/home/jh

Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob

time: 4min 23s
