# Chapter 12: Recommender System
## Ex2: Beauty Recommendation

### Dataset: ratings_Beauty.csv
####  Read more about dataset: http://jmcauley.ucsd.edu/data/amazon/
### Requirement:
- Read dataset
- Pre-process data
- Use "reviewerID", "asin" (ProductID), and overall (User's reviews for each product - rating) to build model to predict overalls => Give recommendation for users.

In [1]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf

In [5]:
# RAM should be changed depends Master
SparkContext.setSystemProperty('spark.executor.memory', '12g') 

# Change IP Spark Master
sc = SparkContext(master='spark://172.25.51.23:7077', appName='Recommendation_Beauty') 

In [6]:
spark = SparkSession(sc)

In [7]:
# Change IP of HDFS Server
data = spark.read.csv("hdfs://172.24.40.247:19000/ratings_Beauty.csv")

In [8]:
data.show(5, truncate=True)

+--------------+----------+---+----------+
|           _c0|       _c1|_c2|       _c3|
+--------------+----------+---+----------+
|A39HTATAQ9V7YF|0205616461|5.0|1369699200|
|A3JM6GV9MNOF9X|0558925278|3.0|1355443200|
|A1Z513UWSAAO0F|0558925278|5.0|1404691200|
|A1WMRR494NWEWV|0733001998|4.0|1382572800|
|A3IAAVS479H7M7|0737104473|1.0|1274227200|
+--------------+----------+---+----------+
only showing top 5 rows



In [9]:
data_sub = data.select(['_c0', '_c1', '_c2'])

In [10]:
data_sub.count()

2023070

In [11]:
data_sub.show(5, truncate=True)

+--------------+----------+---+
|           _c0|       _c1|_c2|
+--------------+----------+---+
|A39HTATAQ9V7YF|0205616461|5.0|
|A3JM6GV9MNOF9X|0558925278|3.0|
|A1Z513UWSAAO0F|0558925278|5.0|
|A1WMRR494NWEWV|0733001998|4.0|
|A3IAAVS479H7M7|0737104473|1.0|
+--------------+----------+---+
only showing top 5 rows



In [12]:
# reviewerID, asin, overall

In [13]:
data_sub = data_sub.withColumnRenamed("_c0", "reviewerID")\
       .withColumnRenamed("_c1", "asin")\
       .withColumnRenamed("_c2", "overall")

In [14]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col, udf

In [15]:
data_sub.show(5, truncate=True)

+--------------+----------+-------+
|    reviewerID|      asin|overall|
+--------------+----------+-------+
|A39HTATAQ9V7YF|0205616461|    5.0|
|A3JM6GV9MNOF9X|0558925278|    3.0|
|A1Z513UWSAAO0F|0558925278|    5.0|
|A1WMRR494NWEWV|0733001998|    4.0|
|A3IAAVS479H7M7|0737104473|    1.0|
+--------------+----------+-------+
only showing top 5 rows



In [16]:
data_sub.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: string (nullable = true)



In [17]:
data_sub = data_sub.withColumn("overall", data_sub["overall"].cast(DoubleType()))

In [18]:
data_sub.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)



In [19]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

Unnamed: 0,0
reviewerID,0
asin,0
overall,0


In [20]:
# Distinct users and movies
users = data_sub.select("reviewerID").distinct().count()
products = data_sub.select("asin").distinct().count()
numerator = data_sub.count()

In [21]:
display(numerator, users, products)

2023070

1210271

249274

In [22]:
# Number of ratings matrix could contain if no empty cells
denominator = users * products
denominator

301689093254

In [23]:
#Calculating sparsity
sparsity = 1 - (numerator*1.0 / denominator)
print ("Sparsity: "), sparsity

Sparsity: 


(None, 0.999993294189133)

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [25]:
# Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [26]:
# Create an indexer
indexer = StringIndexer(inputCol='asin', 
                        outputCol='asin_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data_sub)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='reviewerID', 
                         outputCol='reviewerID_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [27]:
data_indexed.show(5, truncate=True)

+--------------+----------+-------+--------+--------------+
|    reviewerID|      asin|overall|asin_idx|reviewerID_idx|
+--------------+----------+-------+--------+--------------+
|A39HTATAQ9V7YF|0205616461|    5.0|145790.0|       70392.0|
|A3JM6GV9MNOF9X|0558925278|    3.0|103581.0|      265306.0|
|A1Z513UWSAAO0F|0558925278|    5.0|103581.0|      552933.0|
|A1WMRR494NWEWV|0733001998|    4.0|145791.0|      536779.0|
|A3IAAVS479H7M7|0737104473|    1.0|145792.0|       14679.0|
+--------------+----------+-------+--------+--------------+
only showing top 5 rows



In [28]:
data_indexed.printSchema()

root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- asin_idx: double (nullable = false)
 |-- reviewerID_idx: double (nullable = false)



In [29]:
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_indexed.columns]).toPandas().T

Unnamed: 0,0
reviewerID,0
asin,0
overall,0
asin_idx,0
reviewerID_idx,0


In [30]:
# Smaller dataset so we will use 0.8 / 0.2
(training, test) = data_indexed.randomSplit([0.8, 0.2])

In [31]:
# Creating ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from time import time

In [32]:
t0 = time()
als = ALS(maxIter=10, 
          regParam=0.09,           
          rank = 25,
          userCol="reviewerID_idx", 
          itemCol="asin_idx", 
          ratingCol="overall", 
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(training)
t1 = time()

In [33]:
total_time = t1 - t0

In [34]:
total_time

457.0389144420624

In [35]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [36]:
predictions.show(3)

+--------------+----------+-------+--------+--------------+----------+
|    reviewerID|      asin|overall|asin_idx|reviewerID_idx|prediction|
+--------------+----------+-------+--------+--------------+----------+
|A18K6042A1DZMB|B001TJXI5U|    5.0|   148.0|        2563.0|  3.454518|
|A1VJ6OQQNKOGS1|B001TJXI5U|    5.0|   148.0|        6396.0|  4.119822|
| A9A7OP15VV1HH|B001TJXI5U|    5.0|   148.0|      288452.0| 1.3730983|
+--------------+----------+-------+--------+--------------+----------+
only showing top 3 rows



In [37]:
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [38]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.826794010926713


In [39]:
# On average, this model is ~ 1.2 from perfect recommendations.

In [41]:
# Thay đổi địa chỉ IP của HDFS Server
model.save("hdfs://172.24.40.247:19000/Beauty_model_1")

### Providing Recommendations: for all users

In [42]:
# get 5 recommendations which have highest rating.
user_recs = model.recommendForAllUsers(5) 

In [43]:
user_recs.show(5, truncate=False)

+--------------+------------------------------------------------------------------------------------------------------+
|reviewerID_idx|recommendations                                                                                       |
+--------------+------------------------------------------------------------------------------------------------------+
|148           |[{170096, 6.713969}, {58816, 6.5644584}, {99440, 6.531107}, {67337, 6.4514675}, {44910, 6.430691}]    |
|463           |[{64728, 7.0804825}, {128575, 6.9111643}, {65569, 6.8374567}, {101861, 6.7879744}, {95768, 6.7163925}]|
|471           |[{63851, 6.493028}, {46354, 6.4782224}, {62057, 6.3898325}, {132675, 6.315345}, {66484, 6.2931895}]   |
|496           |[{35186, 7.5983624}, {88181, 7.5334196}, {65299, 7.497033}, {25761, 7.4778576}, {208458, 7.4149003}]  |
|833           |[{68433, 7.875903}, {122648, 7.8294396}, {100351, 7.80609}, {71701, 7.7720633}, {85901, 7.76516}]     |
+--------------+------------------------

In [44]:
user_recs.count()

1025426

In [45]:
for user in user_recs.head(5):
    print(user)
    print("\n")

Row(reviewerID_idx=148, recommendations=[Row(asin_idx=170096, rating=6.7139692306518555), Row(asin_idx=58816, rating=6.56445837020874), Row(asin_idx=99440, rating=6.531106948852539), Row(asin_idx=67337, rating=6.451467514038086), Row(asin_idx=44910, rating=6.430690765380859)])


Row(reviewerID_idx=463, recommendations=[Row(asin_idx=64728, rating=7.080482482910156), Row(asin_idx=128575, rating=6.911164283752441), Row(asin_idx=65569, rating=6.837456703186035), Row(asin_idx=101861, rating=6.7879743576049805), Row(asin_idx=95768, rating=6.716392517089844)])


Row(reviewerID_idx=471, recommendations=[Row(asin_idx=63851, rating=6.493028163909912), Row(asin_idx=46354, rating=6.478222370147705), Row(asin_idx=62057, rating=6.389832496643066), Row(asin_idx=132675, rating=6.31534481048584), Row(asin_idx=66484, rating=6.293189525604248)])


Row(reviewerID_idx=496, recommendations=[Row(asin_idx=35186, rating=7.598362445831299), Row(asin_idx=88181, rating=7.533419609069824), Row(asin_idx=65299, rati

### Converting back to string form

In [46]:
t0 = time()

In [47]:
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()

In [48]:
df_reviewer_reviewer_id.count()

1210271

In [49]:
df_reviewer_reviewer_id.show(5)

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|     1099573.0| AIZ6C15NC0P8Q|
|        5789.0|A37ITFQMWUSVU8|
|      183788.0|A1SHFWWZQJEZVM|
|      306399.0| AN7PFLY2LOV41|
|      212749.0|A2EZTXR3AN3WK8|
+--------------+--------------+
only showing top 5 rows



In [50]:
df_asin_asin_idx = data_indexed.select('asin_idx', 'asin').distinct()

In [51]:
df_asin_asin_idx.count()

249274

In [52]:
df_asin_asin_idx.show(5)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  7333.0|B000C2106E|
| 17389.0|B000C214PQ|
| 37191.0|B000C219N8|
|150178.0|B000CD32K0|
| 58321.0|B000CILIK6|
+--------+----------+
only showing top 5 rows



In [53]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['reviewerID_idx'], how='left')

In [54]:
new_user_recs.show(5, truncate=False)

+--------------+-----------------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                                      |reviewerID    |
+--------------+-----------------------------------------------------------------------------------------------------+--------------+
|299           |[{60653, 6.206674}, {95432, 6.194054}, {64754, 5.942935}, {78772, 5.939616}, {108753, 5.925044}]     |ATV36X9V9DRB9 |
|305           |[{110666, 7.772785}, {35186, 7.612976}, {37943, 7.585026}, {72394, 7.572181}, {46360, 7.561183}]     |A3UZ99BQWAJACU|
|496           |[{35186, 7.5983624}, {88181, 7.5334196}, {65299, 7.497033}, {25761, 7.4778576}, {208458, 7.4149003}] |A2Q9KC6JXDG5JA|
|558           |[{67353, 7.557464}, {67657, 7.3952146}, {103220, 7.352634}, {39316, 7.347481}, {48198, 7.3466105}]   |A2CHTRYAGL7P04|
|596           |[{205491, 7.3536673}, {78161, 7.344131}, {8923

In [55]:
new_user_recs.count()

1025426

In [56]:
# Change HDFS server
new_user_recs.write.parquet('hdfs://172.24.40.247:19000/Beauty_U_N.parquet_1', mode='overwrite')
df_asin_asin_idx.write.parquet('hdfs://172.24.40.247:19000/Beauty_P_N.parquet_1', mode='overwrite')

In [57]:
sc.stop()