# Using PySpark to Create an Amazon Review Recommendation System 



This Jupyter Notebook contains code to create a recommendation system for Amazon user reviews on specific products using PySpark.  It was created as a final project for the class INFO 607: Applied Database Technologies at Drexel University.  The data was downloaded from [here](https://jmcauley.ucsd.edu/data/amazon/).  

Additional documentation on this project can be found at the Github repository [here](https://github.com/zachcarlson/ProductRecommender).

## Configuration

We recommend running this notebook in Google Colab using a local runtime and your GPU.  Here are [links](https://stackoverflow.com/questions/51002045/how-to-make-jupyter-notebook-to-run-on-gpu) to setting up this configuration:
- [Local Runtime](https://research.google.com/colaboratory/local-runtimes.html)
- [Utilizing GPU](https://medium.com/deep-learning-turkey/google-colab-free-gpu-tutorial-e113627b9f5d)

Configure your input directory below:

In [None]:
INPUT_DIRECTORY = "/content/drive/MyDrive/Grad School/INFO 607/ProductRecommender/data/" #for google mount
# INPUT_DIRECTORY = "ProductRecommender/data/" #for jupyter notebook

### Google Colab Hosted Runtime

**NOTE**: Due to the limited resources available for Google Colab's Free Tier, this notebook might not run for you if you are running it in Google Drive using a Hosted Runtime.  We recommend using a Google Colab Local Runtime.  However, if you have Colab Pro/Pro+, this notebook *might* work and you can uncomment the cells below to continue with that particular configuration.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# !pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=7494ebf7b54072e02bc73cb188b3d5e42d3f3ea828180c1ae77d8eb0f9d7b597
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


The cell below may take 1-2 minutes to execute:

In [None]:
%%capture 
#prevent large printout with %%capture

#Download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#Install Apache Spark 3.2.1 with Hadoop 3.2, get zipped folder
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

#Unzip folder
!tar xvf spark-3.2.1-bin-hadoop3.2.tgz

#Install findspark, pyspark 3.2.1
!pip install -q findspark
!pip install pyspark==3.2.1

#Set variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.1-bin-hadoop3.2"

### Google Colab Local Runtime

We recommend using a local Jupyter Notebook as it is much faster for a free user, however, it will require some additional configuration.  Follow this tutorial [here](https://changhsinlee.com/install-pyspark-windows-jupyter/).  

In [None]:
import findspark
findspark.init()

## Load Packages

In [None]:
import pandas as pd
import pyspark.sql.functions as F

## Data Acquisition, Preprocessing

### Import Data

In [None]:
#create SparkSession and SparkContext objects
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = SparkContext.getOrCreate()


In [None]:
#Import data via file
file_path = INPUT_DIRECTORY + "ratings_Amazon_Instant_Video.csv"
ratings = spark.read.csv(file_path, header=False, inferSchema=True)
ratings.show(5)


#import data via URL 

#uncomment cells below to run this method
##from pyspark import SparkFiles
##url = "http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/ratings_Amazon_Instant_Video.csv"
##spark.sparkContext.addFile(url)
##ratings = spark.read.csv(SparkFiles.get('ratings_Amazon_Instant_Video.csv'), header=False, inferSchema= True)

+--------------+----------+---+----------+
|           _c0|       _c1|_c2|       _c3|
+--------------+----------+---+----------+
|A1EE2E3N7PW666|B000GFDAUG|5.0|1202256000|
| AGZ8SM1BGK3CK|B000GFDAUG|5.0|1198195200|
|A2VHZ21245KBT7|B000GIOPK2|4.0|1215388800|
| ACX8YW2D5EGP6|B000GIOPK2|4.0|1185840000|
| A9RNMO9MUSMTJ|B000GIOPK2|2.0|1281052800|
+--------------+----------+---+----------+
only showing top 5 rows



In [None]:
ratings.count()

583933

### Pre-processing

#### **Rename columns**

In [None]:
ratings = ratings.withColumnRenamed("_c0", "reviewerID") \
                  .withColumnRenamed("_c1", "productID") \
                  .withColumnRenamed("_c2", "rating") \
                  .withColumnRenamed("_c3", "timestamp")
ratings.show(5)

+--------------+----------+------+----------+
|    reviewerID| productID|rating| timestamp|
+--------------+----------+------+----------+
|A1EE2E3N7PW666|B000GFDAUG|   5.0|1202256000|
| AGZ8SM1BGK3CK|B000GFDAUG|   5.0|1198195200|
|A2VHZ21245KBT7|B000GIOPK2|   4.0|1215388800|
| ACX8YW2D5EGP6|B000GIOPK2|   4.0|1185840000|
| A9RNMO9MUSMTJ|B000GIOPK2|   2.0|1281052800|
+--------------+----------+------+----------+
only showing top 5 rows



#### **Check datatypes**

In [None]:
ratings.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- productID: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



We need `reviewerID` and `productID` to be integers for the ALS algorithm.  We'll create separate tables for `reviewers` and `products`.  At the end of the pre-processing section, we'll combine the tables.

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

reviewers = ratings.select("reviewerID").distinct().coalesce(1)
reviewers.show(5)

+--------------+
|    reviewerID|
+--------------+
|A3FF40QHATHVK0|
|A2OYUAR8I1QT2O|
|A28DPR143MALN7|
|A1RNJ6Q36443HL|
| A2YJL1MX3J4OK|
+--------------+
only showing top 5 rows



In [None]:
reviewers = reviewers.withColumn("userID", monotonically_increasing_id()).persist()
reviewers.show(5)

+--------------+------+
|    reviewerID|userID|
+--------------+------+
|A3FF40QHATHVK0|     0|
|A2OYUAR8I1QT2O|     1|
|A28DPR143MALN7|     2|
|A1RNJ6Q36443HL|     3|
| A2YJL1MX3J4OK|     4|
+--------------+------+
only showing top 5 rows



In [None]:
products = ratings.select("productID").distinct().coalesce(1)
products = products.withColumn("product_ID", monotonically_increasing_id()).persist()
products.show(5)

+----------+----------+
| productID|product_ID|
+----------+----------+
|B000OC3FZQ|         0|
|B000P41FAA|         1|
|B000RKQEQW|         2|
|B000TS73MG|         3|
|B000U5IH7I|         4|
+----------+----------+
only showing top 5 rows



#### **No duplicate ratings**

In [None]:
ratings.groupby("reviewerID", "productID").count().select(F.max("count")).show()

+----------+
|max(count)|
+----------+
|         1|
+----------+



Each user has only one rating per product, thus filtering based on `timestamp` is not needed.  We will keep the timestamp for EDA purposes and to allow for future filtering if the dataset contains multiple ratings for a given user for  a given product.

#### **Join tables**

In [None]:
#Join ratings table with new integer IDs for products and reviewers
product_ratings = ratings.join(reviewers, on="reviewerID", how="left")
product_ratings = product_ratings.join(products, on="productID", how="left")

#select just integer IDs, rating and timestamp
product_ratings = product_ratings.select("userID", "product_ID", "rating", "timestamp")
product_ratings.show(5)

+------+----------+------+----------+
|userID|product_ID|rating| timestamp|
+------+----------+------+----------+
|178680|     21596|   4.0|1402358400|
|101878|     21596|   5.0|1392854400|
| 84105|     21596|   4.0|1392508800|
|195647|     21596|   4.0|1392163200|
| 77065|      9656|   5.0|1203897600|
+------+----------+------+----------+
only showing top 5 rows



In [None]:
#rename columns for readability
product_ratings = product_ratings.withColumnRenamed("userID", "reviewerID")
product_ratings = product_ratings.withColumnRenamed("product_ID", "productID")
product_ratings.show(5)

+----------+---------+------+----------+
|reviewerID|productID|rating| timestamp|
+----------+---------+------+----------+
|    178680|    21596|   4.0|1402358400|
|    101878|    21596|   5.0|1392854400|
|     84105|    21596|   4.0|1392508800|
|    195647|    21596|   4.0|1392163200|
|     77065|     9656|   5.0|1203897600|
+----------+---------+------+----------+
only showing top 5 rows



## Sampling dataset

**FIXME** Might remove. There are 7,824,482 records, so 20% is around 1,564,896 rows. Pyspark sampling method is not accurate, so it might be more or less this number of rows. 

In [None]:
# ratings_sample = product_ratings.sample(0.2, seed = 0)
# ratings_sample.count()

## EDA

Find reviewers with the most ratings:

In [None]:
product_ratings.select("reviewerID", "productID", "rating")\
        .groupby("reviewerID")\
        .count()\
        .sort("count", ascending = False)\
        .show(5)

+----------+-----+
|reviewerID|count|
+----------+-----+
|    314359|  277|
|    320548|  240|
|    384298|  212|
|    173088|  142|
|    115356|  125|
+----------+-----+
only showing top 5 rows



Find products with the most ratings:

In [None]:
product_ratings.select("reviewerID", "productID", "rating")\
        .groupby("productID")\
        .count()\
        .sort("count", ascending = False)\
        .show(5)

+---------+-----+
|productID|count|
+---------+-----+
|      452|12633|
|    15400|10938|
|     3875|10226|
|    23936| 8676|
|    20401| 6927|
+---------+-----+
only showing top 5 rows



Count and average ratings for each product

In [None]:
avg_ratings = (product_ratings
                .select("productID", "rating")              # Select Columns
                .groupby("productID")                       # Group by productID
                .agg(                           
                     F.count("rating").alias("Count"),      # Count number of ratings
                     F.avg("rating").alias("Average")       # Average ratings for each product
                     )
                .sort("Average", "Count", ascending = [False, False]) # Sort results by average and count
            )
avg_ratings.show(5)

+---------+-----+-------+
|productID|Count|Average|
+---------+-----+-------+
|    13490|   41|    5.0|
|     9406|   22|    5.0|
|    19381|   20|    5.0|
|     4990|   19|    5.0|
|    13295|   18|    5.0|
+---------+-----+-------+
only showing top 5 rows



In [None]:
low_avg_rating = avg_ratings.filter(avg_ratings.Average < 2)
low_avg_rating.show(5)

product_num = avg_ratings.select("productID").distinct().count()
lar_count = low_avg_rating.count()
print(f"Number of distinct products: {product_num :,.2f}")
print(f"Number of products with low average (less than 2): {lar_count :,.2f}")
print(f"% low ratings: {lar_count / product_num * 100 :,.2f}")


+---------+-----+------------------+
|productID|Count|           Average|
+---------+-----+------------------+
|    15484|   49|1.9795918367346939|
|    10601|   27| 1.962962962962963|
|    14583|   18|1.9444444444444444|
|    20019|   15|1.9333333333333333|
|    15959|   15|1.9333333333333333|
+---------+-----+------------------+
only showing top 5 rows

Number of distinct products: 23,965.00
Number of products with low average (less than 2): 2,124.00
% low ratings: 8.86


The precentage of low rating (less than 2) products is low (8.9%). 

## Recommendation System

Now we'll build our ALS algorithm using collaborative filtering:

In [None]:
from pyspark.ml.recommendation import ALS

# Initialize ALS with parameters
als = ALS(userCol="reviewerID", itemCol="productID", ratingCol="rating",
          nonnegative=True, coldStartStrategy="drop", implicitPrefs=False)

Next, we'll build the `ParamGridBuilder`:

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

param_grid = ParamGridBuilder() \
                  .addGrid(als.rank, [5, 20]) \
                  .addGrid(als.maxIter, [5]) \
                  .addGrid(als.regParam, [0.01, 0.05, 1]) \
                  .build()

Now we'll build our evaluator and use RMSE as the performance metric:

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# Define evaluator
reg_eval = RegressionEvaluator(metricName = "rmse",
                               predictionCol = "prediction",
                               labelCol = "rating")

print(f"Num models to be tested: {len(param_grid)}")

Num models to be tested: 6


Creating `CrossValidator`:

In [None]:
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(estimator = als, 
                    estimatorParamMaps= param_grid,
                    evaluator = reg_eval,
                    numFolds = 5)

Now, we can fit our training data:

## Sampled dataframe

In [None]:
# # Split data into 80% train, 20% test
# training_data, test_data = ratings_sample.randomSplit([0.8, 0.2], seed = 0)

# # Training model
# model = cv.fit(training_data)

# # Get best model
# best_model = model.bestModel

## Full dataframe

In [None]:
# Split data into 80% train, 20% test
training_data, test_data = product_ratings.randomSplit([0.8, 0.2], seed = 0)

# Training model
model = cv.fit(training_data)

# Get best model
best_model = model.bestModel

In [None]:
print(type(best_model))

print("\n**Best Model**")
print("  Rank:", best_model.rank)
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>

**Best Model**
  Rank: 5
  MaxIter: 5
  RegParam: 1.0


Now we can evaluate our model's performance on the test data:

In [None]:
# Predict ratings using trained model
predictions = best_model.transform(test_data)
predictions.show(5)

+----------+---------+------+----------+----------+
|reviewerID|productID|rating| timestamp|prediction|
+----------+---------+------+----------+----------+
|        15|    20782|   5.0|1385251200| 2.5404944|
|        15|    22391|   5.0|1385251200|  2.975179|
|        22|    16833|   5.0|1383955200| 3.7255514|
|        22|    17391|   5.0|1403913600| 3.6304004|
|        41|     3875|   5.0|1393372800| 3.2725685|
+----------+---------+------+----------+----------+
only showing top 5 rows



In [None]:
# Evaluate the "test_predictions" dataframe
RMSE = reg_eval.evaluate(predictions)

# Print the RMSE
print(RMSE)

1.7372979997729052


## Conclusions