In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
sc = SparkContext()

In [4]:
spark = SparkSession(sc)

### Read the dataset

In [5]:
data = spark.read.json('Du_lieu_cung_cap/reviews_Toys_and_Games_5.json.gz')

In [6]:
data.show(10, truncate=True)

+----------+--------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|  reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------+--------------------+--------------+
|0439893577|  [0, 0]|    5.0|I like the item p...|01 29, 2014|A1VXOAVRGKGEAK|         Angie|      Magnetic board|    1390953600|
|0439893577|  [1, 1]|    4.0|Love the magnet e...|03 28, 2014| A8R62G708TSCM|       Candace|it works pretty g...|    1395964800|
|0439893577|  [1, 1]|    5.0|Both sides are ma...|01 28, 2013|A21KH420DK0ICA|capemaychristy|          love this!|    1359331200|
|0439893577|  [0, 0]|    5.0|Bought one a few ...| 02 8, 2014| AR29QK6HPFYZ4|          dcrm|   Daughters love it|    1391817600|
|0439893577|  [1, 1]|    4.0|I have a stainles...| 05 5, 2014| ACCH8EOML6FN5|          DoyZ|Great

In [7]:
data.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



### Data preprocessing

In [8]:
data_sub = data.select(['asin', 'overall', 'reviewerID'])

In [9]:
data_sub.count()

167597

In [10]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import isnan, when, count, col

In [11]:
data_sub.show(5, truncate=True)

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|0439893577|    5.0|A1VXOAVRGKGEAK|
|0439893577|    4.0| A8R62G708TSCM|
|0439893577|    5.0|A21KH420DK0ICA|
|0439893577|    5.0| AR29QK6HPFYZ4|
|0439893577|    4.0| ACCH8EOML6FN5|
+----------+-------+--------------+
only showing top 5 rows



In [12]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in
                data_sub.columns]).toPandas()

Unnamed: 0,asin,overall,reviewerID
0,0,0,0


- Không có dữ liệu Null

In [13]:
# Check duplicate
dup_rows = data.count() - data.distinct().count()
dup_rows

0

- Không có dữ liệu bị duplicated

In [14]:
data.select('overall').describe().show()

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            167597|
|   mean| 4.356307093802395|
| stddev|0.9935012992131987|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



In [15]:
# Distinct users and movies
users = data.select('reviewerID').distinct().count()
products = data.select('asin').distinct().count()
numerator = data.count()

In [16]:
display(numerator, users, products)

167597

19412

11924

In [17]:
# Number of ratings matrix could contain if no empty cells
denominator = users * products
denominator

231468688

In [18]:
#Calculating sparsity
sparsity = 1 - (numerator*1.0 / denominator)
print ("Sparsity: "), sparsity

Sparsity: 


(None, 0.9992759409428199)

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [20]:
# Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [21]:
# Create an indexer
indexer = StringIndexer(inputCol='asin',
                        outputCol='asin_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data_sub)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='reviewerID',
                        outputCol='reviewerID_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [22]:
data_indexed.show(5, truncate=True)

+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|0439893577|    5.0|A1VXOAVRGKGEAK|  2524.0|       14349.0|
|0439893577|    4.0| A8R62G708TSCM|  2524.0|       18115.0|
|0439893577|    5.0|A21KH420DK0ICA|  2524.0|        4454.0|
|0439893577|    5.0| AR29QK6HPFYZ4|  2524.0|       18990.0|
|0439893577|    4.0| ACCH8EOML6FN5|  2524.0|        2769.0|
+----------+-------+--------------+--------+--------------+
only showing top 5 rows



In [23]:
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in
                    data_indexed.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0
asin_idx,0
reviewerID_idx,0


- Data không có dữ liệu null

### Chia dữ liệu train-test

In [24]:
(training, test) = data_indexed.randomSplit([0.8, 0.2])

### Xây dựng model

In [25]:
# Creating ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [26]:
als = ALS(maxIter=5,
          regParam=0.09,
          rank=25,
          userCol='reviewerID_idx',
          itemCol='asin_idx',
          ratingCol='overall',
          coldStartStrategy='drop',
          nonnegative=True)
model = als.fit(training)

### Đánh giá kết quả

In [27]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [28]:
predictions.select(["asin_idx", "reviewerID_idx",
                    "overall", "prediction",]).show(5)

+--------+--------------+-------+----------+
|asin_idx|reviewerID_idx|overall|prediction|
+--------+--------------+-------+----------+
|    45.0|       13623.0|    5.0| 3.8328078|
|  8056.0|        6397.0|    5.0| 2.6199267|
|    15.0|       18911.0|    5.0| 3.6369438|
|  1907.0|        7253.0|    5.0|  4.101487|
|  2538.0|         463.0|    5.0|  5.185304|
+--------+--------------+-------+----------+
only showing top 5 rows



In [29]:
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.1328857557045706


- This model is ~ 1.12 from perfect recommendations.

### Tuning parameters

In [30]:
als_t = ALS(maxIter=10,
          regParam=0.9,
          userCol='reviewerID_idx',
          itemCol='asin_idx',
          ratingCol='overall',
          coldStartStrategy='drop',
          nonnegative=True
         )
model_t = als_t.fit(training)

In [31]:
# Evaluate the model by computing the RMSE on the test data
predictions_t = model_t.transform(test)

In [32]:
rmse_t = evaluator.evaluate(predictions_t)
print('Root-mean-square-error = ', str(rmse_t))

Root-mean-square-error =  1.283256734084161


- Select model vì rmse của model ~1.12 nhỏ hơn model_t ~1.26

### Recommend for specific users

In [33]:
# Get 10 recommendations which have the highest rating
user_recs = model.recommendForAllUsers(10)

In [34]:
user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [35]:
for user in user_recs.head(3):
    print(user)

Row(reviewerID_idx=26, recommendations=[Row(asin_idx=9850, rating=6.742778301239014), Row(asin_idx=9486, rating=6.539072036743164), Row(asin_idx=6836, rating=6.387267589569092), Row(asin_idx=3493, rating=6.36586856842041), Row(asin_idx=4214, rating=6.262614727020264), Row(asin_idx=7983, rating=6.235605239868164), Row(asin_idx=8309, rating=6.221793174743652), Row(asin_idx=10928, rating=6.215353965759277), Row(asin_idx=6657, rating=6.215272903442383), Row(asin_idx=4168, rating=6.19683313369751)])
Row(reviewerID_idx=27, recommendations=[Row(asin_idx=7983, rating=7.132510185241699), Row(asin_idx=6836, rating=7.098873138427734), Row(asin_idx=8870, rating=7.023540496826172), Row(asin_idx=3493, rating=6.98689603805542), Row(asin_idx=4724, rating=6.969473838806152), Row(asin_idx=9486, rating=6.963817119598389), Row(asin_idx=11870, rating=6.961329936981201), Row(asin_idx=9425, rating=6.900588035583496), Row(asin_idx=7511, rating=6.895907878875732), Row(asin_idx=8158, rating=6.880486965179443)])

### Đưa ra đề xuất cho một user cụ thể

In [36]:
# create a df of distinct 'reviewerID_idx' & 'reviewerID'
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()
df_reviewer_reviewer_id.show(5)

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|        1688.0|A32EBQDMOPEJHE|
|        4094.0| AJ36J4LKI6M0K|
|       15847.0|A2R4AIJZR65WFG|
|       15154.0|A2CV5DM78XPO3K|
|        1155.0|A2R8R97INVXBR1|
+--------------+--------------+
only showing top 5 rows



In [37]:
# join with user_recs
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on='reviewerID_idx', how='left')

In [38]:
# create a df of distinct 'asin_idx' & 'asin'
df_asin_asin_idx = data_indexed.select('asin_idx', 'asin').distinct()
df_asin_asin_idx.show(5)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  9630.0|1603800689|
|    19.0|B00000K3BR|
|  3225.0|B00001ZT4D|
|  2347.0|B00006JBKT|
|  1600.0|B00012TGL6|
+--------+----------+
only showing top 5 rows



In [39]:
# Recommendation for reviewerID = 'A3GJPLCZCDXXG6'
reviewerID_1 = 'A3GJPLCZCDXXG6'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID_1)
user = find_user_rec.first() 
lst = []
for row in user['recommendations']:   
    row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])  
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec_1 = {'reviewerID' : user.reviewerID, 'recommendations' :lst}

In [40]:
dic_user_rec_1

{'reviewerID': 'A3GJPLCZCDXXG6',
 'recommendations': [(3493, 'B00D95E30G', 6.640453338623047),
  (11870, 'B00FHFBQIS', 6.152516841888428),
  (10770, 'B003F2VNGK', 6.144981384277344),
  (4168, 'B0087JGGX6', 6.114174842834473),
  (11443, 'B00847GS48', 6.032721996307373),
  (6836, 'B00D3Y18WO', 6.031802654266357),
  (7983, 'B00CUB6UCE', 6.006269454956055),
  (11622, 'B00A88G6V6', 5.988472938537598),
  (5183, 'B0087JD10C', 5.956704139709473),
  (4214, 'B00CPIYY0W', 5.903722763061523)]}

In [41]:
# Recommendation for reviewerID = 'A34U85WY8ZWBPV'
reviewerID_2 = 'A34U85WY8ZWBPV'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID_2)
user = find_user_rec.first() 
lst = []
for row in user['recommendations']:   
    row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])  
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec_2 = {'reviewerID' : user.reviewerID, 'recommendations' :lst}

In [42]:
dic_user_rec_2

{'reviewerID': 'A34U85WY8ZWBPV',
 'recommendations': [(3493, 'B00D95E30G', 5.5434980392456055),
  (9866, 'B0007XIZ0M', 5.524738311767578),
  (6836, 'B00D3Y18WO', 5.505463123321533),
  (9850, 'B0007N697S', 5.397765636444092),
  (9254, 'B007CB7X1E', 5.351941108703613),
  (9915, 'B000B652ZQ', 5.341699123382568),
  (8158, 'B00027P7SQ', 5.303812503814697),
  (8222, 'B0007XIZ0C', 5.2991862297058105),
  (5377, 'B0001Y6IIS', 5.298669815063477),
  (9814, 'B00030LQK0', 5.2850446701049805)]}

In [43]:
# Recommendation for reviewerID = 'A2VIY2TL6QPYLG'
reviewerID_3 = 'A2VIY2TL6QPYLG'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID_3)
user = find_user_rec.first() 
lst = []
for row in user['recommendations']:   
    row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])  
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec_3 = {'reviewerID' : user.reviewerID, 'recommendations' :lst}

In [44]:
dic_user_rec_3

{'reviewerID': 'A2VIY2TL6QPYLG',
 'recommendations': [(11607, 'B00A0GNOVQ', 6.817030906677246),
  (9847, 'B0007DI63S', 6.733384132385254),
  (5377, 'B0001Y6IIS', 6.511037349700928),
  (5390, 'B00062X6KS', 6.459409713745117),
  (8222, 'B0007XIZ0C', 6.421385288238525),
  (11198, 'B005LAZDMY', 6.373732566833496),
  (8158, 'B00027P7SQ', 6.351343631744385),
  (8224, 'B0007XIZ4I', 6.348474025726318),
  (4168, 'B0087JGGX6', 6.346649646759033),
  (7983, 'B00CUB6UCE', 6.3413214683532715)]}

- user A3GJPLCZCDXXG6 is recommended to buy product with productId = B00D95E30G, B00FHFBQIS,...
- user A34U85WY8ZWBPV is recommended to buy product with productId = B0007XIZ0M, B00D3Y18WO,...
- user A2VIY2TL6QPYLG is recommended to buy product with productId = B00A0GNOVQ, B0007DI63S,...