## Recommendation - Amazon - Toys and Games
*If you have one laptop/computer:*

Use the information "reviewerID", "asin" (ProductID), and “overall” (users’ 
ratings for each product) in dataset **reviews_Toys_and_Games_5.json.gz** to 
build a model to **predict overalls for products** that have not been selected by 
users. 

Then make **recommendations** to some users: A3GJPLCZCDXXG6,
A34U85WY8ZWBPV, A2VIY2TL6QPYLG

In [1]:
import findspark 
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommendation').getOrCreate()

- Đọc dữ liệu

In [3]:
data = spark.read.json("Data/reviews_Toys_and_Games_5.json.gz")

In [4]:
data.show(5)

+----------+-------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|  reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|0439893577| [0, 0]|    5.0|I like the item p...|01 29, 2014|A1VXOAVRGKGEAK|         Angie|      Magnetic board|    1390953600|
|0439893577| [1, 1]|    4.0|Love the magnet e...|03 28, 2014| A8R62G708TSCM|       Candace|it works pretty g...|    1395964800|
|0439893577| [1, 1]|    5.0|Both sides are ma...|01 28, 2013|A21KH420DK0ICA|capemaychristy|          love this!|    1359331200|
|0439893577| [0, 0]|    5.0|Bought one a few ...| 02 8, 2014| AR29QK6HPFYZ4|          dcrm|   Daughters love it|    1391817600|
|0439893577| [1, 1]|    4.0|I have a stainles...| 05 5, 2014| ACCH8EOML6FN5|          DoyZ|Great to have

In [5]:
data.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [6]:
data_sub = data.select(['asin', 'overall', 'reviewerID'])

In [7]:
data_sub.count()

167597

In [8]:
# Distinct users and products
users = data_sub.select("reviewerID").distinct().count()
products = data_sub.select("asin").distinct().count()
numerator = data_sub.count()

In [9]:
display(numerator, users, products)

167597

19412

11924

- Kiểm tra dữ liệu đã ratings

In [10]:
# Number of ratings matrix could contains if no empty cells
denominator = users * products
denominator

231468688

In [11]:
# Calculating sparsity 
sparsity = 1 - (numerator*1.0/denominator)
print("Sparsity: ", sparsity)

Sparsity:  0.9992759409428199


- Chuẩn hóa dữ liệu

In [12]:
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import *

In [14]:
# Create an indexer 
indexer = StringIndexer(inputCols=["asin", "reviewerID"],
                      outputCols=["asin_idx", "reviewerID_idx"])
final_data = indexer.fit(data_sub).transform(data_sub)

In [15]:
final_data.show(5)

+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|0439893577|    5.0|A1VXOAVRGKGEAK|  2524.0|       14349.0|
|0439893577|    4.0| A8R62G708TSCM|  2524.0|       18115.0|
|0439893577|    5.0|A21KH420DK0ICA|  2524.0|        4454.0|
|0439893577|    5.0| AR29QK6HPFYZ4|  2524.0|       18990.0|
|0439893577|    4.0| ACCH8EOML6FN5|  2524.0|        2769.0|
+----------+-------+--------------+--------+--------------+
only showing top 5 rows



In [16]:
# Kiểm tra giá trị null
final_data.select([count(when(col(x).isNull(), x)).alias(x) for x in final_data.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0
asin_idx,0
reviewerID_idx,0


- Chia train-test

In [20]:
train, test = final_data.randomSplit([0.8, 0.2])

- ALS model

In [21]:
als = ALS(maxIter=5,
          regParam=0.01,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)

In [22]:
model = als.fit(train)

In [23]:
# Evaluate the model 
predictions = model.transform(test)

In [25]:
predictions.select("prediction", "overall").show(5)

+----------+-------+
|prediction|overall|
+----------+-------+
| 2.9577603|    3.0|
| 3.8887873|    4.0|
|  4.822749|    5.0|
|  3.756661|    5.0|
| 6.2256937|    5.0|
+----------+-------+
only showing top 5 rows



In [26]:
evaluator = RegressionEvaluator(labelCol="overall",
                                metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [27]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.781429128192685


- Hiệu chỉnh tham số

In [52]:
als_t = ALS(maxIter=10, 
          regParam=0.2,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop", 
          nonnegative=True)
model_t = als_t.fit(train)

In [53]:
# Evaluate the model by computing the RMSE on the test data
predictions_t = model_t.transform(test)

In [54]:
rmse_t = evaluator.evaluate(predictions_t)
rmse_t

1.053620461806695

Vì RMSE nhỏ hơn nên sẽ tốt hơn -> Vì vậy chọn model_t

- Đưa ra đề xuất cho tất cả user 

In [56]:
# Get 10 recommendations which have highest rating 
user_recs = model_t.recommendForAllUsers(10)

In [57]:
user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [58]:
for user in user_recs.head(3):
    print(user)

Row(reviewerID_idx=26, recommendations=[Row(asin_idx=7262, rating=5.084777355194092), Row(asin_idx=8816, rating=5.082546710968018), Row(asin_idx=10827, rating=5.005214214324951), Row(asin_idx=8309, rating=4.987359046936035), Row(asin_idx=10259, rating=4.954450607299805), Row(asin_idx=8537, rating=4.9374871253967285), Row(asin_idx=11914, rating=4.937190532684326), Row(asin_idx=5117, rating=4.936103343963623), Row(asin_idx=11123, rating=4.908851623535156), Row(asin_idx=9254, rating=4.893795490264893)])
Row(reviewerID_idx=27, recommendations=[Row(asin_idx=8816, rating=5.499844551086426), Row(asin_idx=8309, rating=5.49474573135376), Row(asin_idx=7262, rating=5.455920219421387), Row(asin_idx=8610, rating=5.426054000854492), Row(asin_idx=9687, rating=5.408535003662109), Row(asin_idx=6042, rating=5.406440734863281), Row(asin_idx=11749, rating=5.383360385894775), Row(asin_idx=11569, rating=5.37180233001709), Row(asin_idx=9284, rating=5.361762046813965), Row(asin_idx=11829, rating=5.35885810852

In [59]:
# Chuẩn hóa 
result = user_recs.select(user_recs.reviewerID_idx, explode(user_recs.recommendations))

In [60]:
result = result.withColumn("productId", result.col.getField("asin_idx"))\
                .withColumn("rating", result.col.getField("rating"))
result.show()

+--------------+------------------+---------+---------+
|reviewerID_idx|               col|productId|   rating|
+--------------+------------------+---------+---------+
|            26| {7262, 5.0847774}|     7262|5.0847774|
|            26| {8816, 5.0825467}|     8816|5.0825467|
|            26| {10827, 5.005214}|    10827| 5.005214|
|            26|  {8309, 4.987359}|     8309| 4.987359|
|            26|{10259, 4.9544506}|    10259|4.9544506|
|            26|  {8537, 4.937487}|     8537| 4.937487|
|            26|{11914, 4.9371905}|    11914|4.9371905|
|            26| {5117, 4.9361033}|     5117|4.9361033|
|            26|{11123, 4.9088516}|    11123|4.9088516|
|            26| {9254, 4.8937955}|     9254|4.8937955|
|            27| {8816, 5.4998446}|     8816|5.4998446|
|            27| {8309, 5.4947457}|     8309|5.4947457|
|            27|   {7262, 5.45592}|     7262|  5.45592|
|            27|  {8610, 5.426054}|     8610| 5.426054|
|            27|  {9687, 5.408535}|     9687| 5.

- Make **recommendations** to some users: A3GJPLCZCDXXG6,
A34U85WY8ZWBPV, A2VIY2TL6QPYLG

In [63]:
df_reviewer = final_data.select('reviewerID_idx', 'reviewerID').distinct()

In [64]:
df_reviewer.count()

19412

In [65]:
df_reviewer.show(5)

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|        1688.0|A32EBQDMOPEJHE|
|        4094.0| AJ36J4LKI6M0K|
|       15847.0|A2R4AIJZR65WFG|
|       15154.0|A2CV5DM78XPO3K|
|        1155.0|A2R8R97INVXBR1|
+--------------+--------------+
only showing top 5 rows



In [66]:
df_asin = final_data.select('asin_idx', 'asin').distinct()

In [67]:
df_asin.count()

11924

In [68]:
df_asin.show(5)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  9630.0|1603800689|
|    19.0|B00000K3BR|
|  3225.0|B00001ZT4D|
|  2347.0|B00006JBKT|
|  1600.0|B00012TGL6|
+--------+----------+
only showing top 5 rows



In [70]:
new_user_recs = user_recs.join(df_reviewer, on=['reviewerID_idx'], how='left')

In [71]:
new_user_recs.show(10)

+--------------+--------------------+--------------+
|reviewerID_idx|     recommendations|    reviewerID|
+--------------+--------------------+--------------+
|            26|[{7262, 5.0847774...|A15D2X8MICR2VQ|
|            27|[{8816, 5.4998446...| ALDAF4VVLFRHP|
|            28|[{10259, 4.754696...|A1UP19XQH91JT0|
|            31|[{8816, 5.552029}...|A1RKCT4H3X3J1W|
|            34|[{7262, 4.766273}...|A1FQNNX80WYWKR|
|            44|[{11550, 5.085070...|A2HZKWV36U9SXM|
|            53|[{8309, 4.900084}...|A23KACXOE9O9TX|
|            65|[{7262, 5.470727}...|A3R01WHD75L6FG|
|            76|[{8612, 4.959904}...| A6VSWJVTWEOII|
|            78|[{7262, 5.1724234...|A2BZ16RKE13PKV|
+--------------+--------------------+--------------+
only showing top 10 rows



In [74]:
# Recommendation for reviewerID = 'A3GJPLCZCDXXG6'
reviewerID = 'A3GJPLCZCDXXG6'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID)
user = find_user_rec.first()
lst = []
for row in user['recommendations']:
    row_f = df_asin.filter(df_asin.asin_idx == row['asin_idx'])
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations' :lst}

In [75]:
dic_user_rec

{'reviewerID': 'A3GJPLCZCDXXG6',
 'recommendations': [(11749, 'B00C6PUTDK', 5.6095170974731445),
  (8309, 'B000HSVAQ8', 5.569020748138428),
  (8610, 'B001W30E14', 5.560372352600098),
  (8816, 'B003F2636A', 5.545396327972412),
  (5301, 'B00F6T8NYU', 5.539116859436035),
  (11699, 'B00BCJLWZU', 5.493358612060547),
  (11569, 'B009DQGLFA', 5.487114906311035),
  (9254, 'B007CB7X1E', 5.4832763671875),
  (9687, 'B00005BMKX', 5.478617191314697),
  (7262, 'B001JQLJNQ', 5.470534324645996)]}

In [76]:
# Recommendation for reviewerID = 'A34U85WY8ZWBPV'
reviewerID = 'A34U85WY8ZWBPV'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID)
user = find_user_rec.first()
lst = []
for row in user['recommendations']:
    row_f = df_asin.filter(df_asin.asin_idx == row['asin_idx'])
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec2 = {'reviewerID' : user.reviewerID, 'recommendations' :lst}

In [77]:
dic_user_rec2

{'reviewerID': 'A34U85WY8ZWBPV',
 'recommendations': [(10827, 'B003O3FYP6', 5.406717300415039),
  (8537, 'B001GQHS3Y', 5.398741722106934),
  (11123, 'B004Y8TF96', 5.392770767211914),
  (3925, 'B000GKW6FQ', 5.359477996826172),
  (7262, 'B001JQLJNQ', 5.347270965576172),
  (4691, 'B00CM5D8QE', 5.330094337463379),
  (9850, 'B0007N697S', 5.276577472686768),
  (10259, 'B0013E5HW8', 5.261631011962891),
  (9448, 'B00AZZ0F4Q', 5.240566730499268),
  (11268, 'B00633HCFE', 5.200982093811035)]}

In [78]:
# Recommendation for reviewerID = 'A2VIY2TL6QPYLG'
reviewerID = 'A2VIY2TL6QPYLG'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID)
user = find_user_rec.first()
lst = []
for row in user['recommendations']:
    row_f = df_asin.filter(df_asin.asin_idx == row['asin_idx'])
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec3 = {'reviewerID' : user.reviewerID, 'recommendations' :lst}

In [79]:
dic_user_rec3

{'reviewerID': 'A2VIY2TL6QPYLG',
 'recommendations': [(7262, 'B001JQLJNQ', 5.6590447425842285),
  (9928, 'B000BN8XLY', 5.48148775100708),
  (8917, 'B0045O75AU', 5.443199634552002),
  (10827, 'B003O3FYP6', 5.382073402404785),
  (11749, 'B00C6PUTDK', 5.37076997756958),
  (9254, 'B007CB7X1E', 5.35860013961792),
  (11699, 'B00BCJLWZU', 5.335951805114746),
  (9800, 'B0002HYHT6', 5.320662021636963),
  (9669, 'B00003GPDQ', 5.306426048278809),
  (6255, 'B00134TC60', 5.298676490783691)]}