# CS 614 - Applications of Machine Learning
## Nick Babcock
### Code for Assignment 2 - Recommender Systems

This notebook will demonstrate the implementation of Apache PySpark's publicly available ALS algorithm to build a recommendation system which will be trained on a dataset consisting of several Amazon users' ratings of Amazon Fashion products.

#### Starting a PySpark Session

In [None]:
# installing the pyspark package

!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=3f33b21f1af7a6f1d4fc7031be2de61551cdedd012e02f2ddc823c75f552c4c4
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
# starting a pyspark session

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('RecommendersHW').getOrCreate()

In [None]:
# verify that the session was created

spark

#### Loading the Dataset

In [None]:
# NOTE:  this dataset was found at https://nijianmo.github.io/amazon/index.html under the "subsets" section, find Amazon Fasion and click "ratings only", or see below

# DIRECT LINK (file is large and may take a minute to load) - https://jmcauley.ucsd.edu/data/amazon_v2/categoryFilesSmall/AMAZON_FASHION.csv

In [None]:
# We will first create a pandas dataframe using the a link of the csv file on my google drive 
# and then convert it to pyspark. This was the simplest way I could find using pyspark to load
# the dataset without having to download it locally.

import pandas as pd

url = "https://drive.google.com/uc?id=1h9vuWEGXPObYqh1RAxhRzCXTIFxmB_w2"
df = pd.read_csv(url, header = None)
df.head(5)

Unnamed: 0,0,1,2,3
0,7106116521,A1D4G1SNUZWQOT,5.0,1413763200
1,7106116521,A3DDWDH9PX2YX2,2.0,1411862400
2,7106116521,A2MWC41EW7XL15,4.0,1408924800
3,7106116521,A2UH2QQ275NV45,2.0,1408838400
4,7106116521,A89F3LQADZBS5,3.0,1406419200


In [None]:
df.count()

0    883636
1    883636
2    883636
3    883636
dtype: int64

In [None]:
# NOTE: This cell may take 30 seconds - a minute to run

# converting the pandas dataframe into a pyspark dataframe in order to be used in the spark session

fashion = spark.createDataFrame(df) 
fashion.printSchema()
fashion.show()

root
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: long (nullable = true)

+----------+--------------+---+----------+
|         0|             1|  2|         3|
+----------+--------------+---+----------+
|7106116521|A1D4G1SNUZWQOT|5.0|1413763200|
|7106116521|A3DDWDH9PX2YX2|2.0|1411862400|
|7106116521|A2MWC41EW7XL15|4.0|1408924800|
|7106116521|A2UH2QQ275NV45|2.0|1408838400|
|7106116521| A89F3LQADZBS5|3.0|1406419200|
|7106116521|A29HLOUW0NS0EH|5.0|1405728000|
|7106116521| A7QS961ROI6E0|4.0|1401494400|
|B00007GDFV|A1BB77SEBQT8VX|3.0|1379808000|
|B00007GDFV| AHWOW7D1ABO9C|3.0|1374019200|
|B00007GDFV| AKS3GULZE0HFC|3.0|1365811200|
|B00007GDFV| A38NS6NF6WPXS|4.0|1362787200|
|B00007GDFV|A1KOKO3HTSAI1H|2.0|1359244800|
|B00007GDFV|A1G3S57JGZNPCL|1.0|1357257600|
|B00007GDFV| AGBL3TTP6GV4X|1.0|1343606400|
|B00007GDFV|A1Y36BSE9GKXLV|4.0|1268352000|
|B00007GDFV|A1L1U968VNYVA4|3.0|1509408000|
|B00007GDFV|A1NSKPSR0XZ0C9|5.0|1508803200|
|B00

In [7]:
print(f'There are {fashion.count()} ratings in this dataset')

There are 883636 ratings in this dataset


#### EDA and Pre-Processing of the Data

In [8]:
# In order to optimize speed of this model for the purposes of this assignment, I will only use a sample of this dataset

sample = fashion.sample(withReplacement=False, fraction=0.10)
sample.show(5)

+----------+--------------+---+----------+
|         0|             1|  2|         3|
+----------+--------------+---+----------+
|B00007GDFV|A1Y36BSE9GKXLV|4.0|1268352000|
|B00007GDFV|A199RALD1NLRC1|4.0|1498089600|
|B00007GDFV|A1JOKKNG15VE62|5.0|1469836800|
|B00007GDFV| ALJEJH7DMUVB0|1.0|1462924800|
|B00007GDFV|A1ALMLGB7P0P9W|4.0|1457568000|
+----------+--------------+---+----------+
only showing top 5 rows



In [9]:
print(f'There are {sample.count()} ratings in this sample dataset')

There are 88728 ratings in this sample dataset


In [10]:
# Renaming column headers to reflect their meanings

ratings = sample.withColumnRenamed("0", "user_id").withColumnRenamed("1", "product_id").withColumnRenamed("2", "rating").withColumnRenamed("3", "time_stamp")
ratings.show(5)

+----------+--------------+------+----------+
|   user_id|    product_id|rating|time_stamp|
+----------+--------------+------+----------+
|B00007GDFV|A1Y36BSE9GKXLV|   4.0|1268352000|
|B00007GDFV|A199RALD1NLRC1|   4.0|1498089600|
|B00007GDFV|A1JOKKNG15VE62|   5.0|1469836800|
|B00007GDFV| ALJEJH7DMUVB0|   1.0|1462924800|
|B00007GDFV|A1ALMLGB7P0P9W|   4.0|1457568000|
+----------+--------------+------+----------+
only showing top 5 rows



In [11]:
# create a test df to check for any duplicate entries of a user - item pair in case the user rated the item multiple times
# consolidate these duplicates by using the average of their ratings as the new rating value

test = ratings.groupBy('user_id', 'product_id').avg('rating').withColumnRenamed('avg(rating)', 'rating')
test.show(5)

+----------+--------------+------+
|   user_id|    product_id|rating|
+----------+--------------+------+
|B00063VWSA|   AYPV8Y6PMVY|   5.0|
|B000EB3RXM|A1V2XAGY86LIU0|   1.0|
|B000EE1NNA| AYCZAFTJ6GBTH|   5.0|
|B000GHMRLW| AY4XU6B5ZKN0K|   1.0|
|B000KPIHQ4| A7J5XD1A4R0X2|   1.0|
+----------+--------------+------+
only showing top 5 rows



In [12]:
# if there are any duplicates, this number will be less than the number of ratings in the original dataset
print(test.count())
print(sample.count())

88653
88728


<b> Since duplicates were found, the average rating for that product will be used in the training of the recommender </b>

In [13]:
new = test

In [14]:
# creating an integer id column for user and product

from pyspark.ml.feature import StringIndexer

user_indexer = StringIndexer(inputCol="user_id", outputCol="user")
new = user_indexer.fit(new).transform(new)
new.show(1)

+----------+-----------+------+------+
|   user_id| product_id|rating|  user|
+----------+-----------+------+------+
|B00063VWSA|AYPV8Y6PMVY|   5.0|2501.0|
+----------+-----------+------+------+
only showing top 1 row



In [15]:
product_indexer = StringIndexer(inputCol="product_id", outputCol="product")
new = product_indexer.fit(new).transform(new)
new.show(1)

+----------+-----------+------+------+-------+
|   user_id| product_id|rating|  user|product|
+----------+-----------+------+------+-------+
|B00063VWSA|AYPV8Y6PMVY|   5.0|2501.0|85922.0|
+----------+-----------+------+------+-------+
only showing top 1 row



In [16]:
new.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- user: double (nullable = false)
 |-- product: double (nullable = false)



In [17]:
# now create the new dataframe that will be used to train the model

new_ratings = new.select('user', 'product', 'rating')
new_ratings.show(5)

+------+-------+------+
|  user|product|rating|
+------+-------+------+
|2501.0|85922.0|   5.0|
| 636.0|21331.0|   1.0|
|  94.0|85693.0|   5.0|
|  17.0| 1772.0|   1.0|
|   1.0|69098.0|   1.0|
+------+-------+------+
only showing top 5 rows



In [18]:
# use pysparks randomSplit function to split the data into a test and train set

(train, test) = new_ratings.randomSplit([0.8, 0.2], seed = 42)

#### Building the Model

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator, RankingEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# Set coldStartStrategy to 'drop' to get rid of the NaN values

als = ALS(maxIter=3, regParam=0.02, userCol='user', itemCol='product', ratingCol='rating',
          rank=15, coldStartStrategy='drop', implicitPrefs=False)

# fit model to training data
model = als.fit(train)

In [20]:
# transforming model on the test data
predictions = model.transform(test)

#### Testing the Model

In [21]:
from pyspark.sql.functions import round
predictions_sorted = predictions.sort("product", ascending= True)

predictions_sorted = predictions_sorted.withColumn("prediction_rounded", round("prediction"))

In [23]:
# list of the top 5 products and the predicted user ratings with their actual ratings

predictions_sorted.show(5)

+------+-------+------+----------+------------------+
|  user|product|rating|prediction|prediction_rounded|
+------+-------+------+----------+------------------+
|  99.0|    4.0|   5.0|0.52628285|               1.0|
|9656.0|    6.0|   5.0|-0.9625372|              -1.0|
|  40.0|    7.0|   4.0|0.16567913|               0.0|
|2313.0|    8.0|   4.0|0.71988136|               1.0|
|3019.0|    8.0|   2.0|  1.602956|               2.0|
+------+-------+------+----------+------------------+
only showing top 5 rows



In [None]:
# NOTE:  The following cell will take approximately 5-10 minutes to run

In [24]:
# Generate top 10 product recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show(5, truncate = False)

# Generate top 10 user recommendations for each product
productRecs = model.recommendForAllItems(10)
productRecs.show(5, truncate = False)

+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user|recommendations                                                                                                                                                                                       |
+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1   |[{1722, 4.999169}, {1677, 4.999169}, {1634, 4.999169}, {1519, 4.999169}, {1465, 4.999169}, {1401, 4.999169}, {1361, 4.999169}, {1296, 4.999169}, {1191, 4.999169}, {1146, 4.999169}]                  |
|3   |[{1427, 5.177749}, {84593, 4.9958973}, {84316, 4.9958973}, {83971, 4.9958973}, {83091, 4.9958973}, {81501, 4.9958973}, {78259, 4.9958973}, {78205, 4.9958973}, {77919, 4.9

In [25]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics

# evaluating model's results with RMSE
evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction_rounded")
rmse = evaluator_rmse.evaluate(predictions_sorted)
print("Root-mean-square error = " + str(rmse))

# evaluating model's results with MAE
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction_rounded")
mae = evaluator_mae.evaluate(predictions_sorted)
print("Mean Absolute Error = " + str(mae))

# evaluating model's results with MSE
evaluator_mse = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction_rounded")
mse = evaluator_mse.evaluate(predictions_sorted)
print("Mean Squared Error = " + str(mse))

Root-mean-square error = 4.445422376412846
Mean Absolute Error = 4.096858638743456
Mean Squared Error = 19.761780104712038
