In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf

In [3]:
SparkContext.setSystemProperty('spark.executor.memory', '12g')
sc = SparkContext(master='local', appName='Recommendation_Beauty')

In [4]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col, udf

In [5]:
spark = SparkSession(sc)

In [6]:
data = spark.read.json("Beauty_5.json")

In [7]:
data.show(5,truncate=True)

+----------+-------+-------+--------------------+-----------+--------------+------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+------------+--------------------+--------------+
|7806397051| [3, 4]|    1.0|Very oily and cre...|01 30, 2014|A1YJEY40YUW4SE|      Andrea|Don't waste your ...|    1391040000|
|7806397051| [1, 1]|    3.0|This palette was ...|04 18, 2014| A60XNB876KYML|  Jessica H.|         OK Palette!|    1397779200|
|7806397051| [0, 1]|    4.0|The texture of th...| 09 6, 2013|A3G6XNM240RMWA|       Karen|       great quality|    1378425600|
|7806397051| [2, 2]|    2.0|I really can't te...| 12 8, 2013|A1PQFP6SAJ6D80|       Norah|Do not work on my...|    1386460800|
|7806397051| [0, 0]|    3.0|It was a little s...|10 19, 2013|A38FVHZTNQ271F|   Nova Amor|          It's okay.|    1382

In [8]:
data_sub = data.select(['asin', 'overall', 'reviewerID'])

In [9]:
data_sub.count()

198502

In [10]:
data_sub.show()

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|7806397051|    1.0|A1YJEY40YUW4SE|
|7806397051|    3.0| A60XNB876KYML|
|7806397051|    4.0|A3G6XNM240RMWA|
|7806397051|    2.0|A1PQFP6SAJ6D80|
|7806397051|    3.0|A38FVHZTNQ271F|
|7806397051|    5.0|A3BTN14HIZET6Z|
|7806397051|    1.0|A1Z59RFKN0M5QL|
|7806397051|    2.0| AWUO9P6PL1SY8|
|9759091062|    2.0|A3LMILRM9OC3SA|
|9759091062|    3.0|A30IP88QK3YUIO|
|9759091062|    3.0| APBQH4BS48CQO|
|9759091062|    1.0|A3FE8W8UV95U6B|
|9759091062|    5.0|A1EVGDOTGFZOSS|
|9759091062|    1.0| AP5WTCMP6DTRV|
|9759091062|    5.0|A21IM16PQWKVO5|
|9759091062|    2.0|A1TLDR1V4O48PK|
|9759091062|    5.0| A6F8KH0J1AVYA|
|9759091062|    4.0| AXPKZA7UZXKTT|
|9759091062|    3.0|A2SIAYDK7GG7QA|
|9788072216|    5.0|A1QV5IH6HDRN0L|
+----------+-------+--------------+
only showing top 20 rows



In [11]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col, udf

In [12]:
data_sub = data_sub.withColumn("overall", data_sub["overall"].cast(DoubleType()))

In [13]:
data_sub.show(5, truncate=True)

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|7806397051|    1.0|A1YJEY40YUW4SE|
|7806397051|    3.0| A60XNB876KYML|
|7806397051|    4.0|A3G6XNM240RMWA|
|7806397051|    2.0|A1PQFP6SAJ6D80|
|7806397051|    3.0|A38FVHZTNQ271F|
+----------+-------+--------------+
only showing top 5 rows



In [14]:
# check isnull and isna

In [15]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


In [16]:
# Distinct users and movies
users = data_sub.select("reviewerID").distinct().count()
products = data_sub.select("asin").distinct().count()
numerator = data_sub.count()

In [17]:
display(numerator, users, products)

198502

22363

12101

In [18]:
#Number of rating matrix could contain if no empty cells
denominator=users*products
denominator

270614663

In [19]:
# Calculating sparsity
sparsity = 1 - (numerator*1 / denominator)
print("Sparsity: "), sparsity

Sparsity: 


(None, 0.9992664772935825)

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [21]:
# Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [22]:
# Create an indexer
indexer = StringIndexer(inputCol='asin', 
                        outputCol='asin_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data_sub)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='reviewerID', 
                         outputCol='reviewerID_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [23]:
data_indexed.show(5, truncate=True)

+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|7806397051|    1.0|A1YJEY40YUW4SE|  6194.0|       16983.0|
|7806397051|    3.0| A60XNB876KYML|  6194.0|       10399.0|
|7806397051|    4.0|A3G6XNM240RMWA|  6194.0|        5985.0|
|7806397051|    2.0|A1PQFP6SAJ6D80|  6194.0|       11765.0|
|7806397051|    3.0|A38FVHZTNQ271F|  6194.0|        5910.0|
+----------+-------+--------------+--------+--------------+
only showing top 5 rows



In [24]:
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_indexed.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0
asin_idx,0
reviewerID_idx,0


In [25]:
# Smaller dataset so we will use 0.8 / 0.2
(training, test) = data_indexed.randomSplit([0.8, 0.2])

In [26]:
# Creating ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [27]:
als = ALS(maxIter=10, 
          regParam=0.09, 
          rank = 25,
          userCol="reviewerID_idx", 
          itemCol="asin_idx", 
          ratingCol="overall", 
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(training)

In [28]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [29]:
predictions.show(5)

+----------+-------+--------------+--------+--------------+----------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|prediction|
+----------+-------+--------------+--------+--------------+----------+
|B005TI7NQW|    5.0|A29M09QBG9TZLP|   148.0|         148.0|  4.500859|
|B005TI7NQW|    2.0|A3CG93783LP0FO|   148.0|          31.0| 3.0827653|
|B005TI7NQW|    3.0|A2E7RX6AFUDQEX|   148.0|         961.0|  3.496467|
|B005TI7NQW|    1.0|A103BJIOJSDJL1|   148.0|         796.0| 3.6069114|
|B005TI7NQW|    5.0| ALQ4USPEQ9L5N|   148.0|         350.0| 4.1296864|
+----------+-------+--------------+--------+--------------+----------+
only showing top 5 rows



In [30]:
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [31]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.2770569937693024


In [33]:
# save model to disk
model.save("Beauty_model_rec")

## Providing Recommendations: for all users¶


In [34]:
# get 5 recommendations which have highest rating.
user_recs = model.recommendForAllUsers(5) 

In [60]:
user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [35]:
user_recs.show(10, truncate=False)

+--------------+----------------------------------------------------------------------------------------------------+
|reviewerID_idx|recommendations                                                                                     |
+--------------+----------------------------------------------------------------------------------------------------+
|1580          |[{6349, 5.1124573}, {9362, 4.9636087}, {11754, 4.916339}, {7721, 4.9063406}, {5849, 4.8832984}]     |
|4900          |[{3151, 6.2372746}, {6938, 6.0787635}, {5485, 6.0302296}, {3014, 6.009975}, {4983, 5.971571}]       |
|5300          |[{8407, 5.807998}, {7372, 5.763184}, {6349, 5.737353}, {8542, 5.653916}, {4654, 5.6474595}]         |
|6620          |[{11395, 6.2782483}, {12075, 6.2747087}, {12071, 6.2686257}, {12067, 6.2686257}, {12069, 6.2686257}]|
|7240          |[{10205, 5.226036}, {8394, 5.1818295}, {8952, 5.1741934}, {4496, 5.168859}, {7107, 5.156276}]       |
|7340          |[{8407, 5.4019723}, {10205, 5.2157407}, 

In [36]:
user_recs.count()

22361

In [37]:
for user in user_recs.head(5):
    print(user)
    print("\n")

Row(reviewerID_idx=1580, recommendations=[Row(asin_idx=6349, rating=5.112457275390625), Row(asin_idx=9362, rating=4.963608741760254), Row(asin_idx=11754, rating=4.916338920593262), Row(asin_idx=7721, rating=4.906340599060059), Row(asin_idx=5849, rating=4.883298397064209)])


Row(reviewerID_idx=4900, recommendations=[Row(asin_idx=3151, rating=6.237274646759033), Row(asin_idx=6938, rating=6.078763484954834), Row(asin_idx=5485, rating=6.030229568481445), Row(asin_idx=3014, rating=6.009974956512451), Row(asin_idx=4983, rating=5.97157096862793)])


Row(reviewerID_idx=5300, recommendations=[Row(asin_idx=8407, rating=5.807998180389404), Row(asin_idx=7372, rating=5.763184070587158), Row(asin_idx=6349, rating=5.7373528480529785), Row(asin_idx=8542, rating=5.653915882110596), Row(asin_idx=4654, rating=5.647459506988525)])


Row(reviewerID_idx=6620, recommendations=[Row(asin_idx=11395, rating=6.278248310089111), Row(asin_idx=12075, rating=6.2747087478637695), Row(asin_idx=12071, rating=6.26862573

## Save to file


In [38]:
from time import time

In [61]:
t0 = time()

In [62]:
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()
df_reviewer_reviewer_id.count()


22363

In [63]:
df_reviewer_reviewer_id.show(5)


+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|        7501.0|A339O8ZW72WHZ0|
|           2.0| AKMEY1BSHSDG7|
|        9739.0|A30B9UTVDTUQ7Y|
|       11415.0|A1FMYCX030FIDE|
|         718.0| AUFB3GQJV10P0|
+--------------+--------------+
only showing top 5 rows



In [64]:
df_asin_asin_idx = data_indexed.select('asin_idx', 'asin').distinct()


In [65]:
df_asin_asin_idx.count()


12101

In [66]:
df_asin_asin_idx.show(5)


+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  6203.0|B00005TZU8|
|  9941.0|B00005UN90|
|  3659.0|B000142ZFS|
|   810.0|B00027DDOQ|
|  3399.0|B0006IJA5C|
+--------+----------+
only showing top 5 rows



In [67]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['reviewerID_idx'], how='left')


In [68]:
new_user_recs.show(10, truncate=False)


+--------------+------------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                                 |reviewerID    |
+--------------+------------------------------------------------------------------------------------------------+--------------+
|299           |[{4080, 5.5028005}, {3708, 5.441769}, {11045, 5.421581}, {4945, 5.391164}, {11458, 5.363096}]   |A2CZPM110DW516|
|305           |[{8050, 6.176378}, {3459, 6.1532326}, {8701, 6.132951}, {7358, 6.104656}, {4327, 6.0992346}]    |A3EPHBMU07LZ50|
|496           |[{8271, 5.0819235}, {1937, 5.0472856}, {1222, 4.9501734}, {10853, 4.923757}, {11379, 4.9205723}]|A2A5C9IQ06CG9N|
|558           |[{5273, 5.619913}, {12060, 5.5140877}, {8257, 5.507486}, {12087, 5.497896}, {9904, 5.4956317}]  |A9LWDP1HECAU0 |
|596           |[{8057, 5.1836796}, {7721, 5.046693}, {6349, 5.027194}, {5185, 5.0038695}, {8542,

In [69]:
new_user_recs.count()


22361

In [70]:
# Save to disk
new_user_recs.write.parquet('Beauty_U.parquet', mode='overwrite')
df_asin_asin_idx.write.parquet('Beauty_P.parquet', mode='overwrite')

In [71]:
time_duration = time() - t0
print(time_duration)

617.9989709854126


In [72]:
# Recommendation for reviewerID = 'AJK5XGCM6M68A'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == 'AJK5XGCM6M68A')
user = find_user_rec.first() 
lst = []
for row in user['recommendations']:   
    row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])  
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations' :lst}  

In [73]:
dic_user_rec 


{'reviewerID': 'AJK5XGCM6M68A',
 'recommendations': [(9015, 'B002TYBGUS', 6.009134292602539),
  (4945, 'B000BZ4YB0', 5.972158908843994),
  (10205, 'B000JLAWIA', 5.968705654144287),
  (11458, 'B005KE1IH0', 5.95878791809082),
  (8407, 'B000C235A8', 5.906177043914795)]}

## Đọc 2 file đã lưu để lấy dữ liệu đầu vào => Đề xuất

In [74]:
# Read the Parquet file into a new DataFrame 
new_user_recs = spark.read.parquet('Beauty_U.parquet')

In [75]:
new_user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
 |-- reviewerID: string (nullable = true)



In [76]:
new_user_recs.show(2)

+--------------+--------------------+--------------+
|reviewerID_idx|     recommendations|    reviewerID|
+--------------+--------------------+--------------+
|           322|[{5353, 5.362688}...|A2EBR70V6P9W0N|
|           417|[{8617, 5.749925}...|A1BD342U8BF3UC|
+--------------+--------------------+--------------+
only showing top 2 rows



In [77]:
df_asin_asin_idx = spark.read.parquet('Beauty_P.parquet')


In [78]:
df_asin_asin_idx.printSchema()

root
 |-- asin_idx: double (nullable = true)
 |-- asin: string (nullable = true)



In [79]:
df_asin_asin_idx.show(2)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  6194.0|7806397051|
|  5523.0|B0000AJ3PT|
+--------+----------+
only showing top 2 rows



In [80]:
# Recommendation for reviewerID = 'AJK5XGCM6M68A'
find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == 'AJK5XGCM6M68A')
find_user_rec.show(truncate=False)

+--------------+----------------------------------------------------------------------------------------------+-------------+
|reviewerID_idx|recommendations                                                                               |reviewerID   |
+--------------+----------------------------------------------------------------------------------------------+-------------+
|21521         |[{9015, 6.0091343}, {4945, 5.972159}, {10205, 5.9687057}, {11458, 5.958788}, {8407, 5.906177}]|AJK5XGCM6M68A|
+--------------+----------------------------------------------------------------------------------------------+-------------+



In [81]:
result = ''
for user in find_user_rec.collect():
  lst = []
  for row in user['recommendations']:   
      print(row)
      row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])  
      row_f_first = row_f.first()
      lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
  dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations' :lst}     
  result = dic_user_rec

Row(asin_idx=9015, rating=6.009134292602539)
Row(asin_idx=4945, rating=5.972158908843994)
Row(asin_idx=10205, rating=5.968705654144287)
Row(asin_idx=11458, rating=5.95878791809082)
Row(asin_idx=8407, rating=5.906177043914795)


In [82]:
print("Recommendation for: ", 'AJK5XGCM6M68A')
print(result)

Recommendation for:  AJK5XGCM6M68A
{'reviewerID': 'AJK5XGCM6M68A', 'recommendations': [(9015, 'B002TYBGUS', 6.009134292602539), (4945, 'B000BZ4YB0', 5.972158908843994), (10205, 'B000JLAWIA', 5.968705654144287), (11458, 'B005KE1IH0', 5.95878791809082), (8407, 'B000C235A8', 5.906177043914795)]}


In [83]:
from pyspark.sql.functions import col, explode

In [84]:
find_user_rec = find_user_rec.select(find_user_rec.reviewerID, explode(find_user_rec.recommendations))


In [88]:
find_user_rec = find_user_rec.withColumn('asin', find_user_rec.col.getField("asin_idx")).withColumn("rating",find_user_rec.col.getField("rating"))

In [89]:
find_user_rec.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- col: struct (nullable = true)
 |    |-- asin_idx: integer (nullable = true)
 |    |-- rating: float (nullable = true)
 |-- asin: integer (nullable = true)
 |-- rating: float (nullable = true)



In [92]:
#filter all movideIDS HAVING RATING >=3.0
find_user_rec.show()

+-------------+------------------+-----+---------+
|   reviewerID|               col| asin|   rating|
+-------------+------------------+-----+---------+
|AJK5XGCM6M68A| {9015, 6.0091343}| 9015|6.0091343|
|AJK5XGCM6M68A|  {4945, 5.972159}| 4945| 5.972159|
|AJK5XGCM6M68A|{10205, 5.9687057}|10205|5.9687057|
|AJK5XGCM6M68A| {11458, 5.958788}|11458| 5.958788|
|AJK5XGCM6M68A|  {8407, 5.906177}| 8407| 5.906177|
+-------------+------------------+-----+---------+



In [95]:
find_user_rec.filter(find_user_rec.rating > 3.0).show(3)


+-------------+------------------+-----+---------+
|   reviewerID|               col| asin|   rating|
+-------------+------------------+-----+---------+
|AJK5XGCM6M68A| {9015, 6.0091343}| 9015|6.0091343|
|AJK5XGCM6M68A|  {4945, 5.972159}| 4945| 5.972159|
|AJK5XGCM6M68A|{10205, 5.9687057}|10205|5.9687057|
+-------------+------------------+-----+---------+
only showing top 3 rows

