In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
import findspark
findspark.init()

spark-2.4.0-bin-hadoop2.7/
spark-2.4.0-bin-hadoop2.7/python/
spark-2.4.0-bin-hadoop2.7/python/setup.cfg
spark-2.4.0-bin-hadoop2.7/python/pyspark/
spark-2.4.0-bin-hadoop2.7/python/pyspark/resultiterable.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/python/
spark-2.4.0-bin-hadoop2.7/python/pyspark/python/pyspark/
spark-2.4.0-bin-hadoop2.7/python/pyspark/python/pyspark/shell.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/heapq3.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/join.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/version.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/rdd.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/java_gateway.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/find_spark_home.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/_globals.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/worker.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/accumulators.py
spark-2.4.0-bin-hadoop2.7/python/pyspark/mllib/
spark-2.4.0-bin-hadoop2.7/python/pyspark/mllib/feature.py
spark-2.4.0-bin-hadoop2.7/python/pyspark

In [2]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [3]:
%cd '/content/gdrive/My Drive/LDS0_K273_ONLINE_DoThiPhuong/Topic_2/'

/content/gdrive/My Drive/LDS0_K273_ONLINE_DoThiPhuong/Topic_2


In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode

In [5]:
sc = SparkContext()

In [6]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [7]:
data = spark.read.csv('Reviews.csv',inferSchema=True,header=True)

In [8]:
data.show(5, False)

+---+---+-----------+----------+------+
|_c0|id |customer_id|product_id|rating|
+---+---+-----------+----------+------+
|0  |0  |709310     |10001012  |3     |
|1  |1  |10701688   |10001012  |5     |
|2  |2  |11763074   |10001012  |5     |
|3  |3  |9909549    |10001012  |5     |
|4  |4  |1827148    |10001012  |5     |
+---+---+-----------+----------+------+
only showing top 5 rows



In [9]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [10]:
customers = data.select('customer_id').distinct().count()
products = data.select('product_id').distinct().count()
numerator = data.count()

In [11]:
display(numerator, customers, products)

361090

251149

4214

In [12]:
denominator = customers* products
denominator

1058341886

In [13]:
# độ thưa thớt của dữ liệu
sparsity = 1 - (numerator*1.0/denominator) 
print('Sparsity: ', sparsity)

Sparsity:  0.999658815355627


In [14]:
data.groupBy("rating").count().show()

+------+------+
|rating| count|
+------+------+
|     1| 16616|
|     3| 20600|
|     5|256211|
|     4| 60565|
|     2|  7098|
+------+------+



In [15]:
(training, test) = data.randomSplit([0.8, 0.2])

In [16]:
als = ALS(maxIter = 20,
          regParam = 0.1,
          userCol = 'customer_id',
          itemCol = 'product_id',
          ratingCol = 'rating',
          coldStartStrategy = 'drop',
          nonnegative = True)
model = als.fit(training)

In [17]:
predictions = model.transform(test)

In [18]:
predictions.show(5)

+-----+-----+-----------+----------+------+----------+
|  _c0|   id|customer_id|product_id|rating|prediction|
+-----+-----+-----------+----------+------+----------+
|48567|48567|    7523016|   1675793|     5| 3.7366116|
|48573|48573|    2225318|   1675793|     5| 2.9967952|
|48568|48568|   13098881|   1675793|     4|  2.769885|
|68613|68613|   13676965|   2069769|     4| 2.8575258|
|68597|68597|   11137984|   2069769|     2| 2.0279472|
+-----+-----+-----------+----------+------+----------+
only showing top 5 rows



In [19]:
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol = 'rating',
                                predictionCol = 'prediction')
rmse = evaluator.evaluate(predictions)
rmse

1.2721176230827271

In [20]:
data.describe(['rating']).show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            361090|
|   mean| 4.475136392589105|
| stddev|1.0166716679634682|
|    min|                 1|
|    max|                 5|
+-------+------------------+



RMSE đang rất cao, 1.27 so với std là 1.01 => tuning parameter để cải thiện model.

In [21]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [22]:
# initialize the ALS model
als_model = ALS(userCol='customer_id', itemCol='product_id', ratingCol='rating', coldStartStrategy='drop', nonnegative=True)

# create the parameter grid
params = ParamGridBuilder().addGrid(als_model.regParam, [.01, .05, .1, .15]).addGrid(als_model.rank, [5, 10, 50, 100]).build()

# create crossvalidator estimator
cv = CrossValidator(estimator = als_model, estimatorParamMaps= params, evaluator= evaluator, parallelism = 4)
best_model = cv.fit(data)
model = best_model.bestModel

In [23]:
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print('RMSE:', rmse)

RMSE: 0.2951857150515815


Recommender có sự cải thiện lớn về sai số, giảm từ 1.27 xuống 0.295.

Đề xuất cho 5 sp có dự đoán rating cao nhất

In [24]:
user_recs = model.recommendForAllUsers(5)

In [25]:
user_recs.printSchema()

root
 |-- customer_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [26]:
for user in user_recs.head(5):
  print(user)

Row(customer_id=471, recommendations=[Row(product_id=58545325, rating=6.797727584838867), Row(product_id=44579109, rating=6.792884826660156), Row(product_id=24558526, rating=6.419763565063477), Row(product_id=81926706, rating=6.204355239868164), Row(product_id=75615357, rating=6.152498245239258)])
Row(customer_id=833, recommendations=[Row(product_id=15212692, rating=6.687552452087402), Row(product_id=74556930, rating=6.595274925231934), Row(product_id=58545325, rating=6.488923072814941), Row(product_id=29248443, rating=6.3208184242248535), Row(product_id=73952674, rating=6.287042617797852)])
Row(customer_id=7993, recommendations=[Row(product_id=74893380, rating=6.878391265869141), Row(product_id=38519367, rating=6.798414707183838), Row(product_id=56936788, rating=6.766576290130615), Row(product_id=68454398, rating=6.670620441436768), Row(product_id=29248443, rating=6.643612861633301)])
Row(customer_id=9427, recommendations=[Row(product_id=75402106, rating=1.3466577529907227), Row(produ

In [27]:
customer_id = 997878
result = user_recs.filter(user_recs['customer_id'] == customer_id)
result.show(truncate = False)

+-----------+-----------------------------------------------------------------------------------------------------------------+
|customer_id|recommendations                                                                                                  |
+-----------+-----------------------------------------------------------------------------------------------------------------+
|997878     |[[49078809, 7.3777604], [44579109, 6.8818226], [55366883, 6.872575], [58545325, 6.819825], [72969822, 6.6251655]]|
+-----------+-----------------------------------------------------------------------------------------------------------------+



In [28]:
result = result.select(result.customer_id, explode(result.recommendations))
result = result.withColumn('product_id', result.col.getField('product_id'))\
                .withColumn('rating', result.col.getField('rating'))
result.show()

+-----------+--------------------+----------+---------+
|customer_id|                 col|product_id|   rating|
+-----------+--------------------+----------+---------+
|     997878|[49078809, 7.3777...|  49078809|7.3777604|
|     997878|[44579109, 6.8818...|  44579109|6.8818226|
|     997878|[55366883, 6.872575]|  55366883| 6.872575|
|     997878|[58545325, 6.819825]|  58545325| 6.819825|
|     997878|[72969822, 6.6251...|  72969822|6.6251655|
+-----------+--------------------+----------+---------+



In [29]:
result.filter(result.rating >= 3.0).show()

+-----------+--------------------+----------+---------+
|customer_id|                 col|product_id|   rating|
+-----------+--------------------+----------+---------+
|     997878|[49078809, 7.3777...|  49078809|7.3777604|
|     997878|[44579109, 6.8818...|  44579109|6.8818226|
|     997878|[55366883, 6.872575]|  55366883| 6.872575|
|     997878|[58545325, 6.819825]|  58545325| 6.819825|
|     997878|[72969822, 6.6251...|  72969822|6.6251655|
+-----------+--------------------+----------+---------+



In [30]:
df_customer_id = data.select('customer_id').distinct()

In [31]:
df_customer_id.count()

251149

In [32]:
df_customer_id.show(5)

+-----------+
|customer_id|
+-----------+
|    7621182|
|    7920941|
|   12999702|
|    8658313|
|     681118|
+-----------+
only showing top 5 rows



In [33]:
df_product_id = data.select('product_id').distinct()

In [34]:
df_product_id.show(5)

+----------+
|product_id|
+----------+
|  10723695|
|  11415485|
|  13119283|
|   1675793|
|  16963971|
+----------+
only showing top 5 rows



In [35]:
df_product_id.count()

4214

In [36]:
new_user_recs = user_recs.join(df_customer_id, on = 'customer_id', how = 'left')

In [37]:
new_user_recs.show(5, False)

+-----------+-------------------------------------------------------------------------------------------------------------------+
|customer_id|recommendations                                                                                                    |
+-----------+-------------------------------------------------------------------------------------------------------------------+
|471        |[[58545325, 6.7977276], [44579109, 6.792885], [24558526, 6.4197636], [81926706, 6.2043552], [75615357, 6.1524982]] |
|833        |[[15212692, 6.6875525], [74556930, 6.595275], [58545325, 6.488923], [29248443, 6.3208184], [73952674, 6.2870426]]  |
|7993       |[[74893380, 6.8783913], [38519367, 6.7984147], [56936788, 6.7665763], [68454398, 6.6706204], [29248443, 6.643613]] |
|9427       |[[75402106, 1.3466578], [49078809, 1.3431011], [73329078, 1.3311408], [58545325, 1.3100071], [50091437, 1.3048438]]|
|17420      |[[57440303, 6.8596706], [81926706, 6.5114207], [49385947, 6.350623], [7455692

In [38]:
new_user_recs.count()

251149

In [39]:
new_user_recs.write.parquet('Rec_U.parquet', mode = 'overwrite')
df_product_id.write.parquet('Rec_P.parquet', mode = 'overwrite')

In [40]:
# Recommendation for customer_id = 18866
customer_id = 18866
find_user_rec = new_user_recs.filter(new_user_recs['customer_id'] == customer_id)
user = find_user_rec.first() 
lst = []
for row in user['recommendations']:   
    row_f = df_product_id.filter(df_product_id.product_id == row['product_id'])  
    row_f_first = row_f.first()
    lst.append((row_f_first['product_id'], row['rating']))
dic_user_rec = {'customer_id' : user.customer_id, 'recommendations' :lst}

In [41]:
dic_user_rec

{'customer_id': 18866,
 'recommendations': [(57440303, 6.773859024047852),
  (58545325, 6.455607891082764),
  (81926706, 6.252448081970215),
  (46627072, 6.1244001388549805),
  (73952674, 6.105007171630859)]}

In [42]:
new_user_recs = spark.read.parquet('Rec_U.parquet')

In [43]:
new_user_recs.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [44]:
new_user_recs.show(2)

+-----------+--------------------+
|customer_id|     recommendations|
+-----------+--------------------+
|       3898|[[58545325, 7.315...|
|      21786|[[58545325, 1.419...|
+-----------+--------------------+
only showing top 2 rows



In [45]:
df_product_id = spark.read.parquet('Rec_P.parquet')

In [46]:
df_product_id.printSchema()

root
 |-- product_id: integer (nullable = true)



In [47]:
df_product_id.show(2)

+----------+
|product_id|
+----------+
|  11118211|
|  11891181|
+----------+
only showing top 2 rows



In [48]:
customer_id = 12999702
find_user_rec = new_user_recs.filter(new_user_recs['customer_id'] == customer_id)
find_user_rec.show(truncate = False)

+-----------+-----------------------------------------------------------------------------------------------------------------+
|customer_id|recommendations                                                                                                  |
+-----------+-----------------------------------------------------------------------------------------------------------------+
|12999702   |[[58545325, 6.9045196], [73952674, 6.678771], [15212692, 6.664337], [44579109, 6.6239953], [72323554, 6.4681163]]|
+-----------+-----------------------------------------------------------------------------------------------------------------+



In [49]:
result = ''
for user in find_user_rec.collect():
  lst = []
  for row in user['recommendations']:   
      print(row)
      row_f = df_product_id.filter(df_product_id.product_id == row['product_id'])  
      row_f_first = row_f.first()
      lst.append((row_f_first['product_id'], row['rating']))
  dic_user_rec = {'customer_id' : user.customer_id, 'recommendations' :lst}    
  result = dic_user_rec

Row(product_id=58545325, rating=6.904519557952881)
Row(product_id=73952674, rating=6.678771018981934)
Row(product_id=15212692, rating=6.664337158203125)
Row(product_id=44579109, rating=6.623995304107666)
Row(product_id=72323554, rating=6.468116283416748)


In [50]:
print('Recommendation for: ', customer_id)
print(result)

Recommendation for:  12999702
{'customer_id': 12999702, 'recommendations': [(58545325, 6.904519557952881), (73952674, 6.678771018981934), (15212692, 6.664337158203125), (44579109, 6.623995304107666), (72323554, 6.468116283416748)]}
