# Project : Amazon Video Games

A recommendation system is an algorithm that predicts a user's preferences or ratings for an item based on their past behavior, interactions, or the behavior of similar users.

In this project, we will use the **reviews_Video_Games_5.json.gz** dataset, which contains video game reviews from Amazon. Specifically, we will focus on the "reviewerID", "asin" (ProductID), and "overall" (user ratings for each product) attributes in the dataset. The goal is to build a model that can predict the "overall" ratings for products that a user has not yet interacted with. This approach aims to provide personalized product recommendations, thereby enhancing user engagement and satisfaction.

Source : https://cseweb.ucsd.edu/%7Ejmcauley/datasets/amazon/links.html

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
SparkContext.setSystemProperty('spark.hadoop.dfs.client.use.datanode.hostname', 'true')
sc=SparkContext(master='local', appName='New Spark Context')
sc

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession(sc)
spark

In [3]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer

In [None]:
data=spark.read.json("D:/DS/Video_Games_5.json")
data.show(3)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0700099867|[8, 12]|    1.0|Installing the ga...| 07 9, 2012|A2HD75EMZR8QLN|                 123|Pay to unlock con...|    1341792000|
|0700099867| [0, 0]|    4.0|If you like rally...|06 30, 2013|A3UR8NLLY1ZHCX|Alejandro Henao "...|     Good rally game|    1372550400|
|0700099867| [0, 0]|    1.0|1st shipment rece...|06 28, 2014|A1INA0F5CWW3J4|Amazon Shopper "M...|           Wrong key|    1403913600|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
only showing top 3 rows



In [5]:
data_sub = data.select(['asin', 'overall', 'reviewerID'])
data_sub.show(5)

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|0700099867|    1.0|A2HD75EMZR8QLN|
|0700099867|    4.0|A3UR8NLLY1ZHCX|
|0700099867|    1.0|A1INA0F5CWW3J4|
|0700099867|    3.0|A1DLMTOTHQ4AST|
|0700099867|    4.0|A361M14PU2GUEG|
+----------+-------+--------------+
only showing top 5 rows



In [6]:
data_sub.count()

231780

In [7]:
data_sub.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewerID: string (nullable = true)



In [8]:
data_sub = data_sub.withColumn("overall", data_sub["overall"].cast(DoubleType()))

In [9]:
data_sub.select([count(when(isnull(c), c)).alias(c) for c in data_sub.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


In [10]:
data_sub.select([count(when(isnan(c), c)).alias(c) for c in data_sub.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


In [11]:
users = data_sub.select("reviewerID").distinct().count()
products = data_sub.select("asin").distinct().count()
numerator = data_sub.count()
users, products, numerator

(24303, 10672, 231780)

In [12]:
# Sparsity
sparsity = 1 - (numerator / (users * products))
print ("Sparsity: ", sparsity)

Sparsity:  0.9991063442479476


In [13]:
data_indexed=StringIndexer(inputCol='asin', outputCol='asin_idx').fit(data_sub).transform(data_sub)
data_indexed=StringIndexer(inputCol='reviewerID', outputCol='reviewerID_idx').fit(data_indexed).transform(data_indexed)
data_indexed.show(5)

+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|0700099867|    1.0|A2HD75EMZR8QLN|  2269.0|       14157.0|
|0700099867|    4.0|A3UR8NLLY1ZHCX|  2269.0|       22489.0|
|0700099867|    1.0|A1INA0F5CWW3J4|  2269.0|        7934.0|
|0700099867|    3.0|A1DLMTOTHQ4AST|  2269.0|        7852.0|
|0700099867|    4.0|A361M14PU2GUEG|  2269.0|         847.0|
+----------+-------+--------------+--------+--------------+
only showing top 5 rows



In [14]:
data_indexed.select([count(when(isnull(c), c)).alias(c) for c in data_sub.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


In [15]:
train, test = data_indexed.randomSplit([0.8, 0.2])

In [24]:
als = ALS(maxIter=10,
          regParam=0.1,
          rank = 15,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(train)

In [25]:
predictions=model.transform(test)
predictions.show(5)

+----------+-------+--------------+--------+--------------+----------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|prediction|
+----------+-------+--------------+--------+--------------+----------+
|B00000DMAR|    5.0|A1QHGON6QDTX2K|  1621.0|       13285.0|  4.061522|
|B00000F1GM|    5.0|A2AV2TR28DGSGC|   290.0|        1645.0| 4.3437734|
|B00000K514|    4.0|A12WZTC4YJ8ZEC|  7388.0|         496.0|  4.405699|
|B00001LAE2|    4.0|A2NJO6YE954DBH|  2757.0|       20135.0|  3.443389|
|B00002SUOV|    5.0|A3PI78LW7ENR1C|   410.0|       15619.0| 3.8626418|
+----------+-------+--------------+--------+--------------+----------+
only showing top 5 rows



In [27]:
evaluator=RegressionEvaluator(metricName='rmse', labelCol='overall', predictionCol='prediction')
rmse=evaluator.evaluate(predictions)
rmse

1.2240971307543231

TUNNING PARAMETER

In [18]:
for regParam in [0.01, 0.1, 1]:
    for rank in [10, 20, 30, 40]:
        als = ALS(maxIter=10,
          regParam=regParam,
          rank = rank,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)
        
        model = als.fit(train)

        predictions=model.transform(test)

        evaluator=RegressionEvaluator(metricName='rmse', labelCol='overall', predictionCol='prediction')
        rmse=evaluator.evaluate(predictions)
        print('With regParam =', regParam, ', rank =', rank, ': RSME =', rmse)

With regParam = 0.01 , rank = 10 : RSME = 1.620807270931743
With regParam = 0.01 , rank = 20 : RSME = 1.451252249996847
With regParam = 0.01 , rank = 30 : RSME = 1.388174280364535
With regParam = 0.01 , rank = 40 : RSME = 1.3722240661477678
With regParam = 0.1 , rank = 10 : RSME = 1.2549736164974294
With regParam = 0.1 , rank = 20 : RSME = 1.2184306907403926
With regParam = 0.1 , rank = 30 : RSME = 1.2104780686409016
With regParam = 0.1 , rank = 40 : RSME = 1.197064953244429
With regParam = 1 , rank = 10 : RSME = 1.4554353103825102
With regParam = 1 , rank = 20 : RSME = 1.4554411754813787
With regParam = 1 , rank = 30 : RSME = 1.4554599350746218
With regParam = 1 , rank = 40 : RSME = 1.4554582428447769


In [22]:
for maxIter in [15, 20]:
  for regParam in [0.1, 1]:
     for rank in [20, 25, 30, 35]:
          als = ALS(maxIter=10,
            regParam=regParam,
            rank = rank,
            userCol="reviewerID_idx",
            itemCol="asin_idx",
            ratingCol="overall",
            coldStartStrategy="drop",
            nonnegative=True)
        
          model = als.fit(train)

          predictions=model.transform(test)

          evaluator=RegressionEvaluator(metricName='rmse', labelCol='overall', predictionCol='prediction')
          rmse=evaluator.evaluate(predictions)
          print('With maxIter =', maxIter, ', regParam =', regParam, ', rank =', rank, ': RSME =', rmse)

With maxIter = 15 , regParam = 0.1 , rank = 20 : RSME = 1.2184306907403926
With maxIter = 15 , regParam = 0.1 , rank = 25 : RSME = 1.2119784954942823
With maxIter = 15 , regParam = 0.1 , rank = 30 : RSME = 1.2104780686409016
With maxIter = 15 , regParam = 0.1 , rank = 35 : RSME = 1.2021304854366643
With maxIter = 15 , regParam = 1 , rank = 20 : RSME = 1.4554411754813787
With maxIter = 15 , regParam = 1 , rank = 25 : RSME = 1.4554524621523863
With maxIter = 15 , regParam = 1 , rank = 30 : RSME = 1.4554599350746218
With maxIter = 15 , regParam = 1 , rank = 35 : RSME = 1.4554455312881636
With maxIter = 20 , regParam = 0.1 , rank = 20 : RSME = 1.2184306907403926
With maxIter = 20 , regParam = 0.1 , rank = 25 : RSME = 1.2119784954942823
With maxIter = 20 , regParam = 0.1 , rank = 30 : RSME = 1.2104780686409016
With maxIter = 20 , regParam = 0.1 , rank = 35 : RSME = 1.2021304854366643
With maxIter = 20 , regParam = 1 , rank = 20 : RSME = 1.4554411754813787
With maxIter = 20 , regParam = 1 , 

In [18]:
als = ALS(maxIter=20,
          regParam=0.3,
          rank = 35,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(train)

predictions=model.transform(test)

evaluator=RegressionEvaluator(metricName='rmse', labelCol='overall', predictionCol='prediction')
rmse=evaluator.evaluate(predictions)
rmse

1.1769099417376128

Choose the model with maxIter=20, regParam=0.3, and rank=35, as it yields the lowest RMS

In [None]:
model.save("Saved_Model/Recommendation_Model")

In [19]:
user_recs = model.recommendForAllUsers(10)

In [21]:
user_recs.show(10, False)

+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|reviewerID_idx|recommendations                                                                                                                                                                                |
+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0             |[{5567, 5.170495}, {10395, 5.1384835}, {9536, 5.111155}, {10074, 5.0490193}, {5439, 5.0289917}, {8237, 5.0169296}, {9330, 5.0027604}, {6190, 4.979928}, {8750, 4.975552}, {4476, 4.967814}]    |
|1             |[{5567, 4.671962}, {9536, 4.6549263}, {10395, 4.6226745}, {8237, 4.5801897}, {10074, 4.5777617}, {8750, 4.5658355}, {4629, 4.5330887}, {4476, 4.5310

In [23]:
for user in user_recs.head(10):
    print(user)

Row(reviewerID_idx=0, recommendations=[Row(asin_idx=5567, rating=5.17049503326416), Row(asin_idx=10395, rating=5.13848352432251), Row(asin_idx=9536, rating=5.111155033111572), Row(asin_idx=10074, rating=5.0490193367004395), Row(asin_idx=5439, rating=5.02899169921875), Row(asin_idx=8237, rating=5.016929626464844), Row(asin_idx=9330, rating=5.002760410308838), Row(asin_idx=6190, rating=4.979928016662598), Row(asin_idx=8750, rating=4.975552082061768), Row(asin_idx=4476, rating=4.967813968658447)])
Row(reviewerID_idx=1, recommendations=[Row(asin_idx=5567, rating=4.671961784362793), Row(asin_idx=9536, rating=4.654926300048828), Row(asin_idx=10395, rating=4.622674465179443), Row(asin_idx=8237, rating=4.5801897048950195), Row(asin_idx=10074, rating=4.577761650085449), Row(asin_idx=8750, rating=4.565835475921631), Row(asin_idx=4629, rating=4.533088684082031), Row(asin_idx=4476, rating=4.531039237976074), Row(asin_idx=5439, rating=4.527980327606201), Row(asin_idx=3531, rating=4.482486248016357)

In [25]:
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()
df_reviewer_reviewer_id.show(5)

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|       20806.0|A2ZYJOZO6BPV6K|
|         735.0|A3TQTYD0D6AUO3|
|        2580.0|A2QVKLB1VT903K|
|        9117.0|A3OMBKL5EOHA36|
|        2945.0|A2NWQA506BES77|
+--------------+--------------+
only showing top 5 rows



In [26]:
df_asin_asin_idx = data_indexed.select('asin_idx', 'asin').distinct()
df_asin_asin_idx.show(5)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|   883.0|B000038IFX|
|  2005.0|B00005Q8J1|
|  4809.0|B00005YYFE|
|  3085.0|B00006F2ZR|
|  4821.0|B00007KUW5|
+--------+----------+
only showing top 5 rows



In [27]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['reviewerID_idx'], how='left')
new_user_recs.show(10, truncate=False)

+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                                                                                                                                |reviewerID    |
+--------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+
|8             |[{5567, 5.02801}, {10395, 5.025039}, {9536, 4.9933233}, {8237, 4.947788}, {8750, 4.899927}, {5439, 4.8990645}, {10074, 4.8756647}, {4629, 4.873579}, {8685, 4.83212}, {10268, 4.8257375}]      |A1AISPOIIHTHXX|
|0             |[{5567, 5.170495}, {10395, 5.1384835}, {9536, 5.111155}, {10074, 5.0490193}, {5439, 5.02

In [29]:
new_user_recs.write.parquet('Saved_Model/Question4/Video_Games_U.parquet', mode='overwrite')
df_asin_asin_idx.write.parquet('Saved_Model/Question4/Video_Games_P.parquet', mode='overwrite')

In [31]:
# Recommend for users
for reviewerID in ['A29KT7UP7DLM1J', 'A1WGVOVABHFDF3', 'A3DIS5O83SQJWW']:
    find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID)
    user = find_user_rec.first()
    lst = []
    for row in user['recommendations']:
        row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])
        row_f_first = row_f.first()
        lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
    dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations' :lst}
    print(dic_user_rec)

{'reviewerID': 'A29KT7UP7DLM1J', 'recommendations': [(5567, 'B00002SVO9', 4.003811836242676), (9536, 'B00006IKBG', 3.9786391258239746), (5439, 'B004DGJP2G', 3.96638560295105), (7425, 'B000035XKX', 3.961329460144043), (1520, 'B000B6MLTG', 3.9610238075256348), (4629, 'B002LIT3F2', 3.9505438804626465), (3249, 'B00004WKHO', 3.93021559715271), (8237, 'B000035XGG', 3.9193837642669678), (10148, 'B0024FAXII', 3.8894078731536865), (6024, 'B0076RRYA4', 3.88702392578125)]}
{'reviewerID': 'A1WGVOVABHFDF3', 'recommendations': [(10395, 'B004VF06AY', 5.153601169586182), (8832, 'B001E1BNZU', 5.059384346008301), (8685, 'B000LWRMHQ', 5.035480976104736), (5567, 'B00002SVO9', 5.029973983764648), (9536, 'B00006IKBG', 5.021553039550781), (8237, 'B000035XGG', 5.011380672454834), (8750, 'B000WPTGOY', 4.966602802276611), (10074, 'B001E2UGVQ', 4.960259437561035), (3059, 'B00001X50L', 4.947412967681885), (8802, 'B001AZ7RK0', 4.919321060180664)]}
{'reviewerID': 'A3DIS5O83SQJWW', 'recommendations': [(5567, 'B00002