<a href="https://colab.research.google.com/github/simonatso/amazon-reviews/blob/main/amazonReviewsCF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2'

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommendations').getOrCreate()
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col

In [None]:
metaPath = '/content/drive/MyDrive/Springboard/Amazon Reviews Data/meta_Magazine_Subscriptions.json'
reviewPath = '/content/drive/MyDrive/Springboard/Amazon Reviews Data/Magazine_Subscriptions.json'

metaDF = spark.read.json(metaPath)
reviewDF = spark.read.json(reviewPath)


In [None]:
metaDF.show()

+--------------------+--------------------+----------+--------------------+--------------------+----+--------------------+--------------------+-------+---+--------------------+--------------------+--------------------+-----+--------------------+------------+-----+-----+--------------------+
|            also_buy|           also_view|      asin|               brand|            category|date|         description|             details|feature|fit|            imageURL|     imageURLHighRes|            main_cat|price|                rank|similar_item|tech1|tech2|               title|
+--------------------+--------------------+----------+--------------------+--------------------+----+--------------------+--------------------+-------+---+--------------------+--------------------+--------------------+-----+--------------------+------------+-----+-----+--------------------+
|[B002PXVYLE, B01M...|[B002PXVYLE, B000...|B00005N7NQ|     Reason Magazine|[Magazine Subscri...|    |[REASON is edited...|{B

In [None]:
reviewDF.show()

+----------+-----+-------+--------------------+-----------+--------------+------------------+-----------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|      reviewerName|            style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+------------------+-----------------+--------------------+--------------+--------+----+
|B00005N7P0| null|    5.0|for computer enth...| 11 8, 2001| AH2IFH762VY5U|      ted sedlmayr|             null|AVID READER SINCE...|    1005177600|   false|   9|
|B00005N7P0| null|    5.0|Thank god this is...|10 31, 2001| AOSFI0JEYU4XM|   Amazon Customer|             null|  The straight scoop|    1004486400|   false|   9|
|B00005N7OJ| null|    3.0|Antiques Magazine...|03 24, 2007|A3JPFWKS83R49V|       Bryan Carey|{ Print Magazine}|Antiques Magazine...|    1174694400|   false|  14|
|B00005N7OJ| null|    5.0|Th

In [None]:
# Drop duplicates
metaDF = metaDF.dropDuplicates()
reviewDF = reviewDF.dropDuplicates()

In [None]:
#check how many products, reviews and reviewers there are
print('There are {} unique products and {} unique reviews made by {} unique reviewers'.format(
    metaDF.count(), reviewDF.count(), reviewDF.select('reviewerID').distinct().count()
))

There are 2320 unique products and 88496 unique reviews made by 72098 unique reviewers


In [None]:
#collect relevant data for ALS CF
ratings = reviewDF[['reviewerID', 'asin', 'overall']]
ratings.show()

+--------------+----------+-------+
|    reviewerID|      asin|overall|
+--------------+----------+-------+
|A1KW68OP8JGZN7|B00005N7P0|    5.0|
| AZ0HINOWMIJ2X|B00005N7OD|    5.0|
|A33DGWR9R1LMSN|B00005N7OV|    2.0|
|A1SGKJEF8JLZN4|B00005N7Q1|    5.0|
|A1FS2188LNICAQ|B00005N7Q1|    5.0|
|A3EDVOA3YVOZ64|B00005N7OV|    5.0|
| ANIFMRZOVLORV|B00005N7Q1|    5.0|
|A2E2RTYY77DSF9|B00005N7SC|    3.0|
|A2YSJYZ2NJISBB|B00005N7OV|    3.0|
| A83TW10AX3OQM|B00005N7O6|    3.0|
|A1HQ8WATO130TZ|B00005N7SC|    4.0|
|A3HO0FX4WC9YTU|B00005N7Q1|    5.0|
| A82BF05B2NLV5|B00005N7NQ|    5.0|
|A1XZ00QY7UTNM3|B00005N7OV|    4.0|
|A32L97DIPDSJR8|B00005N7SC|    2.0|
|A3O9IT2C9NTYWV|B00005N7S8|    5.0|
|A12FQUL8LUKPJM|B00005N7T3|    4.0|
| ATHM0ZECHGM7O|B00005N7Q1|    4.0|
|A3KEV5K607E277|B00005N7PG|    5.0|
|A25GPV9SQ2LY34|B00005N7PG|    5.0|
+--------------+----------+-------+
only showing top 20 rows



In [None]:
ratings = ratings.withColumnRenamed('asin', 'productID').withColumnRenamed('overall', 'rating')
ratings.show()

+--------------+----------+------+
|    reviewerID| productID|rating|
+--------------+----------+------+
|A1KW68OP8JGZN7|B00005N7P0|   5.0|
| AZ0HINOWMIJ2X|B00005N7OD|   5.0|
|A33DGWR9R1LMSN|B00005N7OV|   2.0|
|A1SGKJEF8JLZN4|B00005N7Q1|   5.0|
|A1FS2188LNICAQ|B00005N7Q1|   5.0|
|A3EDVOA3YVOZ64|B00005N7OV|   5.0|
| ANIFMRZOVLORV|B00005N7Q1|   5.0|
|A2E2RTYY77DSF9|B00005N7SC|   3.0|
|A2YSJYZ2NJISBB|B00005N7OV|   3.0|
| A83TW10AX3OQM|B00005N7O6|   3.0|
|A1HQ8WATO130TZ|B00005N7SC|   4.0|
|A3HO0FX4WC9YTU|B00005N7Q1|   5.0|
| A82BF05B2NLV5|B00005N7NQ|   5.0|
|A1XZ00QY7UTNM3|B00005N7OV|   4.0|
|A32L97DIPDSJR8|B00005N7SC|   2.0|
|A3O9IT2C9NTYWV|B00005N7S8|   5.0|
|A12FQUL8LUKPJM|B00005N7T3|   4.0|
| ATHM0ZECHGM7O|B00005N7Q1|   4.0|
|A3KEV5K607E277|B00005N7PG|   5.0|
|A25GPV9SQ2LY34|B00005N7PG|   5.0|
+--------------+----------+------+
only showing top 20 rows



In [None]:
def get_mat_sparsity(ratings):
    # Count the total number of ratings in the dataset
    count_nonzero = ratings.select("rating").count()

    # Count the number of distinct reviewerIDs and distinct productIDs
    total_elements = ratings.select("reviewerID").distinct().count() * ratings.select("productID").distinct().count()

    # Divide the numerator by the denominator
    sparsity = (1.0 - (count_nonzero *1.0)/total_elements)*100
    print("The ratings dataframe is ", "%.3f" % sparsity + "% sparse.")
    
get_mat_sparsity(ratings)

The ratings dataframe is  99.949% sparse.


In [None]:
ratings.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- productID: string (nullable = true)
 |-- rating: double (nullable = true)



In [None]:
reviewer_indexer = StringIndexer(inputCol="reviewerID", outputCol="reviewerIndex")
product_indexer = StringIndexer(inputCol="productID", outputCol="productIndex")

pipeline = Pipeline(stages = [reviewer_indexer, product_indexer])

ratingsIndexed = pipeline.fit(ratings).transform(ratings)

ratingsIndexed.show()

+--------------+----------+------+-------------+------------+
|    reviewerID| productID|rating|reviewerIndex|productIndex|
+--------------+----------+------+-------------+------------+
|A1KW68OP8JGZN7|B00005N7P0|   5.0|      19857.0|        94.0|
| AZ0HINOWMIJ2X|B00005N7OD|   5.0|      71642.0|       100.0|
|A33DGWR9R1LMSN|B00005N7OV|   2.0|       6993.0|        15.0|
|A1SGKJEF8JLZN4|B00005N7Q1|   5.0|       4405.0|         1.0|
|A1FS2188LNICAQ|B00005N7Q1|   5.0|      17553.0|         1.0|
|A3EDVOA3YVOZ64|B00005N7OV|   5.0|      49525.0|        15.0|
| ANIFMRZOVLORV|B00005N7Q1|   5.0|      66430.0|         1.0|
|A2E2RTYY77DSF9|B00005N7SC|   3.0|      33036.0|         2.0|
|A2YSJYZ2NJISBB|B00005N7OV|   3.0|       2055.0|        15.0|
| A83TW10AX3OQM|B00005N7O6|   3.0|       2510.0|       247.0|
|A1HQ8WATO130TZ|B00005N7SC|   4.0|      18440.0|         2.0|
|A3HO0FX4WC9YTU|B00005N7Q1|   5.0|      51058.0|         1.0|
| A82BF05B2NLV5|B00005N7NQ|   5.0|      59392.0|       156.0|
|A1XZ00Q

In [None]:
ratingsDF = ratingsIndexed[['reviewerIndex', 'productIndex', 'rating']]
ratingsDF = ratingsDF.withColumnRenamed('reviewerIndex', 'reviewerID').withColumnRenamed('productIndex', 'productID')
ratingsDF.show()

+----------+---------+------+
|reviewerID|productID|rating|
+----------+---------+------+
|   19857.0|     94.0|   5.0|
|   71642.0|    100.0|   5.0|
|    6993.0|     15.0|   2.0|
|    4405.0|      1.0|   5.0|
|   17553.0|      1.0|   5.0|
|   49525.0|     15.0|   5.0|
|   66430.0|      1.0|   5.0|
|   33036.0|      2.0|   3.0|
|    2055.0|     15.0|   3.0|
|    2510.0|    247.0|   3.0|
|   18440.0|      2.0|   4.0|
|   51058.0|      1.0|   5.0|
|   59392.0|    156.0|   5.0|
|    4742.0|     15.0|   4.0|
|   44189.0|      2.0|   2.0|
|   53967.0|    292.0|   5.0|
|   11418.0|     90.0|   4.0|
|   69144.0|      1.0|   4.0|
|    1018.0|    139.0|   5.0|
|   29200.0|    139.0|   5.0|
+----------+---------+------+
only showing top 20 rows



In [None]:
ratingsDF.printSchema()

root
 |-- reviewerID: double (nullable = false)
 |-- productID: double (nullable = false)
 |-- rating: double (nullable = true)



In [None]:
# Create test and train set
(train, test) = ratingsDF.randomSplit([0.8, 0.2], seed = 1)

In [None]:
# Create ALS model
als = ALS(
         userCol="reviewerID", 
         itemCol="productID",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

In [None]:
# Add hyperparameters and their respective values to param_grid
# .addGrid(als.maxIter, [5, 50, 100, 200]) \

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [1, 2, 5, 10, 20]) \
            .addGrid(als.regParam, [0.1, 0.15, 0.2, 0.25, 0.3]) \
            .build()


In [None]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  25


In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [None]:
# Fit cross validator to the 'train' dataset
model = cv.fit(train)
# Extract best model from the cv model above
best_model = model.bestModel
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.2149262175043927


In [None]:
print("**Best Model**")
# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())
# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

**Best Model**
  Rank: 1
  MaxIter: 10
  RegParam: 0.25


In [None]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(5)
recommendations.show()

+----------+--------------------+
|reviewerID|     recommendations|
+----------+--------------------+
|       148|[{2038, 7.6304517...|
|       463|[{2038, 4.522575}...|
|       471|[{2038, 7.937259}...|
|       496|[{2038, 3.7214746...|
|       833|[{2038, 3.3086932...|
|      1088|[{2038, 3.6437275...|
|      1238|[{2038, 7.770404}...|
|      1342|[{2038, 5.4820113...|
|      1580|[{2038, 8.30215},...|
|      1591|[{2038, 2.377523}...|
|      1645|[{2038, 8.05496},...|
|      1829|[{2038, 7.7977138...|
|      1959|[{2038, 7.0777416...|
|      2122|[{2038, 6.810679}...|
|      2142|[{2038, 7.911775}...|
|      2366|[{2038, 4.9246454...|
|      2659|[{2038, 6.452627}...|
|      2866|[{2038, 6.3854504...|
|      3175|[{2038, 7.505753}...|
|      3749|[{2038, 7.752999}...|
+----------+--------------------+
only showing top 20 rows



In [None]:
recommendations.limit(10).show()

+----------+--------------------+
|reviewerID|     recommendations|
+----------+--------------------+
|       148|[{2038, 7.6304517...|
|       463|[{2038, 4.522575}...|
|       471|[{2038, 7.937259}...|
|       496|[{2038, 3.7214746...|
|       833|[{2038, 3.3086932...|
|      1088|[{2038, 3.6437275...|
|      1238|[{2038, 7.770404}...|
|      1342|[{2038, 5.4820113...|
|      1580|[{2038, 8.30215},...|
|      1591|[{2038, 2.377523}...|
+----------+--------------------+



In [None]:
recommendations.printSchema()

root
 |-- reviewerID: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productID: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [None]:
recommendationsRenamed = recommendations.withColumn('rec_exp', explode('recommendations')).select('reviewerID', col("rec_exp.productID"), col("rec_exp.rating"))
recommendationsRenamed.show()

+----------+---------+---------+
|reviewerID|productID|   rating|
+----------+---------+---------+
|       148|     2038|7.6304517|
|       148|     1791| 6.363171|
|       148|     1964|6.2831044|
|       148|     1581|6.1109095|
|       148|     1925| 6.009438|
|       463|     2038| 4.522575|
|       463|     1791|3.7714567|
|       463|     1964|3.7240012|
|       463|     1581| 3.621941|
|       463|     1925|3.5617988|
|       471|     2038| 7.937259|
|       471|     1791|6.6190233|
|       471|     1964|6.5357375|
|       471|     1581| 6.356619|
|       471|     1925|6.2510676|
|       496|     2038|3.7214746|
|       496|     1791|3.1034048|
|       496|     1964|3.0643551|
|       496|     1581|2.9803734|
|       496|     1925|2.9308844|
+----------+---------+---------+
only showing top 20 rows



In [None]:
recommendationsRenamed = recommendationsRenamed.withColumnRenamed('reviewerID', 'reviewerIndex').withColumnRenamed('productID', 'productIndex').withColumnRenamed('rating', 'predictedRating')
recommendationsRenamed.show()

+-------------+------------+---------------+
|reviewerIndex|productIndex|predictedRating|
+-------------+------------+---------------+
|          148|        2038|      7.6304517|
|          148|        1791|       6.363171|
|          148|        1964|      6.2831044|
|          148|        1581|      6.1109095|
|          148|        1925|       6.009438|
|          463|        2038|       4.522575|
|          463|        1791|      3.7714567|
|          463|        1964|      3.7240012|
|          463|        1581|       3.621941|
|          463|        1925|      3.5617988|
|          471|        2038|       7.937259|
|          471|        1791|      6.6190233|
|          471|        1964|      6.5357375|
|          471|        1581|       6.356619|
|          471|        1925|      6.2510676|
|          496|        2038|      3.7214746|
|          496|        1791|      3.1034048|
|          496|        1964|      3.0643551|
|          496|        1581|      2.9803734|
|         

In [None]:
ratingsIndexed = ratingsIndexed.withColumn('productIndex', ratingsIndexed.productIndex.cast(IntegerType()))\
                              .withColumn('reviewerIndex', ratingsIndexed.reviewerIndex.cast(IntegerType()))

In [None]:
ratingsIndexed.show()

+--------------+----------+------+-------------+------------+
|    reviewerID| productID|rating|reviewerIndex|productIndex|
+--------------+----------+------+-------------+------------+
|A1KW68OP8JGZN7|B00005N7P0|   5.0|        19857|          94|
| AZ0HINOWMIJ2X|B00005N7OD|   5.0|        71642|         100|
|A33DGWR9R1LMSN|B00005N7OV|   2.0|         6993|          15|
|A1SGKJEF8JLZN4|B00005N7Q1|   5.0|         4405|           1|
|A1FS2188LNICAQ|B00005N7Q1|   5.0|        17553|           1|
|A3EDVOA3YVOZ64|B00005N7OV|   5.0|        49525|          15|
| ANIFMRZOVLORV|B00005N7Q1|   5.0|        66430|           1|
|A2E2RTYY77DSF9|B00005N7SC|   3.0|        33036|           2|
|A2YSJYZ2NJISBB|B00005N7OV|   3.0|         2055|          15|
| A83TW10AX3OQM|B00005N7O6|   3.0|         2510|         247|
|A1HQ8WATO130TZ|B00005N7SC|   4.0|        18440|           2|
|A3HO0FX4WC9YTU|B00005N7Q1|   5.0|        51058|           1|
| A82BF05B2NLV5|B00005N7NQ|   5.0|        59392|         156|
|A1XZ00Q

In [None]:
# Join dataframe to obtain original reviewerIDs
recommendationsJoined = recommendationsRenamed.join(ratingsIndexed[['reviewerIndex', 'reviewerID']].distinct(), on = 'reviewerIndex', how = 'left')

In [None]:
# Join dataframe to obtain original productID's
recommendationsJoined = recommendationsJoined.join(ratingsIndexed[['productIndex', 'productID']].distinct(), on = 'productIndex', how = 'left')

In [None]:
recommendationsJoined.show()

+------------+-------------+---------------+--------------+----------+
|productIndex|reviewerIndex|predictedRating|    reviewerID| productID|
+------------+-------------+---------------+--------------+----------+
|        2038|          148|      7.6304517|A2KEE2U6XXU9IB|B00006KFU3|
|        1791|          148|       6.363171|A2KEE2U6XXU9IB|B00006KBLW|
|        1964|          148|      6.2831044|A2KEE2U6XXU9IB|B00XII1UAK|
|        1581|          148|      6.1109095|A2KEE2U6XXU9IB|B019INBTD6|
|        1925|          148|       6.009438|A2KEE2U6XXU9IB|B00BIT16B4|
|        2038|          463|       4.522575|A1VFG5SVYMBH7K|B00006KFU3|
|        1791|          463|      3.7714567|A1VFG5SVYMBH7K|B00006KBLW|
|        1964|          463|      3.7240012|A1VFG5SVYMBH7K|B00XII1UAK|
|        1581|          463|       3.621941|A1VFG5SVYMBH7K|B019INBTD6|
|        1925|          463|      3.5617988|A1VFG5SVYMBH7K|B00BIT16B4|
|        2038|          471|       7.937259|A1ZTNMCY9VU44D|B00006KFU3|
|     

In [None]:
# Drop productIndex and reviewerIndex
recommendationsJoined = recommendationsJoined[['reviewerID', 'productID', 'predictedRating']]
recommendationsJoined.show()

+--------------+----------+---------------+
|    reviewerID| productID|predictedRating|
+--------------+----------+---------------+
|A2KEE2U6XXU9IB|B00006KFU3|      7.6304517|
|A2KEE2U6XXU9IB|B00006KBLW|       6.363171|
|A2KEE2U6XXU9IB|B00XII1UAK|      6.2831044|
|A2KEE2U6XXU9IB|B019INBTD6|      6.1109095|
|A2KEE2U6XXU9IB|B00BIT16B4|       6.009438|
|A1VFG5SVYMBH7K|B00006KFU3|       4.522575|
|A1VFG5SVYMBH7K|B00006KBLW|      3.7714567|
|A1VFG5SVYMBH7K|B00XII1UAK|      3.7240012|
|A1VFG5SVYMBH7K|B019INBTD6|       3.621941|
|A1VFG5SVYMBH7K|B00BIT16B4|      3.5617988|
|A1ZTNMCY9VU44D|B00006KFU3|       7.937259|
|A1ZTNMCY9VU44D|B00006KBLW|      6.6190233|
|A1ZTNMCY9VU44D|B00XII1UAK|      6.5357375|
|A1ZTNMCY9VU44D|B019INBTD6|       6.356619|
|A1ZTNMCY9VU44D|B00BIT16B4|      6.2510676|
|A2F1GA6WIQW3ZV|B00006KFU3|      3.7214746|
|A2F1GA6WIQW3ZV|B00006KBLW|      3.1034048|
|A2F1GA6WIQW3ZV|B00XII1UAK|      3.0643551|
|A2F1GA6WIQW3ZV|B019INBTD6|      2.9803734|
|A2F1GA6WIQW3ZV|B00BIT16B4|     

In [None]:
# Let's look at reviewer A5RHZE7B8SV5Q's recommendations

recommendationsWithMeta = recommendationsJoined.join(metaDF, metaDF['asin'] == recommendationsJoined['productID'], how = 'left').filter("reviewerID = 'A5RHZE7B8SV5Q'").select('productID', 'title', 'brand','category', 'description', 'predictedRating').show(20, False)

+----------+---------------------------------------------+-----------------------------+-----------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Let's look at reviewer A5RHZE7B8SV5Q's actual preference
reviewDF.join(metaDF, on = 'asin', how = 'left').filter("reviewerID = 'A5RHZE7B8SV5Q'").select('asin','brand', 'category', 'description', 'overall').show(20, False)

+----------+-----------------------+----------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

The magazines recommended to reviewer A5RHZE7B8SV5Q are categorized as Cooking and Fashion & Style, which are closely related to the user's actual preferences.  Reviewer A5RHZE7B8SV5Q is also recommended Entertainment & Media as well as Art & Art History magazines.  This could stem from the reviewer's interest in Men's Lifestyle content.  Two of the recommended magazines are also fro the brand 'Conde Nast Publications,' which the user already consumes on a regular.