In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [3]:
SparkContext.setSystemProperty('spark.executor.memory', '12g')
sc = SparkContext(master='local',appName='Recommendation_product')



## Đọc dữ liệu

In [4]:
spark=SparkSession(sc)

In [64]:
product = spark.read.csv('Product.csv',inferSchema=True,header=True)

In [5]:
data = spark.read.csv('Review.csv',inferSchema=True,header=True)

In [6]:
data.show()

+---+-----------+----------+--------------------+--------------------+-------------------+------+--------------------+--------------------+
|_c0|customer_id|product_id|                name|           full_name|       created_time|rating|               title|             content|
+---+-----------+----------+--------------------+--------------------+-------------------+------+--------------------+--------------------+
|  0|     709310|  10001012|    Lân Nguyễn Hoàng|    Lân Nguyễn Hoàng|               null|     3|  Ko dùng đc thẻ nhớ|Lúcđầu quên thông...|
|  1|   10701688|  10001012|    Nguyễn Khánh Hòa|    Nguyễn Khánh Hòa|               null|     5|     Cực kì hài lòng|Tiki giao hàng nh...|
|  2|   11763074|  10001012|     Toàn Phạm Khánh|     Toàn Phạm Khánh|2019-04-17 15:42:45|     5|     Cực kì hài lòng|chất lượng camera...|
|  3|    9909549|  10001012|   Nguyen Quang Minh|                null|               null|     5|        Rất hài lòng|Hàng được đóng gó...|
|  4|    1827148|  1

In [7]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- created_time: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)



In [8]:
data_sub = data.select(['product_id','rating','customer_id'])

In [9]:
# Distinct users and movies
reviewers = data_sub.select('customer_id').distinct().count()
product = data_sub.select('product_id').distinct().count()
numerator = data_sub.count()

In [10]:
display('tổng số khách hàng là',reviewers,'tổng số sản phẩm là',product,'số lượt đánh giá sản phẩm là',numerator)

'tổng số khách hàng là'

251972

'tổng số sản phẩm là'

4618

'số lượt đánh giá sản phẩm là'

362797

In [11]:
from pyspark.sql.functions import isnan, when, count, col, udf

## Xóa dữ liệu null

In [12]:
# kiểm tra null
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

Unnamed: 0,0
product_id,1302
rating,1721
customer_id,853


In [13]:
data_sub=data_sub.na.drop()

In [14]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

Unnamed: 0,0
product_id,0
rating,0
customer_id,0


## Sửa kiểu dữ liệu của các  cột customer_id, product_id, rating => double

In [15]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

In [16]:
data_sub=data_sub.withColumn("rating",data_sub.rating.cast(DoubleType()))

In [17]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

Unnamed: 0,0
product_id,0
rating,27
customer_id,0


Nhận xét: Sau khi chuyển rating => kiểu double thì dữ liệu phát sinh Null => xóa thêm lần nữa

In [18]:
data_sub=data_sub.na.drop()

In [19]:
# Create an indexer
indexer = StringIndexer(inputCol='product_id', 
                        outputCol='product_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data_sub)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='customer_id', 
                         outputCol='customer_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [20]:
data_indexed.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_idx: double (nullable = false)
 |-- customer_idx: double (nullable = false)



Nhận xét: 3 cột đều đã được chuyển sang double thành công

## Buld Model ALS

In [21]:
training, testing = data_indexed.randomSplit([0.8,0.2])

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [23]:
als_sub = ALS(maxIter=20, regParam=0.44,rank=150,
         userCol='customer_idx', itemCol='product_idx', ratingCol='rating',
         coldStartStrategy='drop',nonnegative=True)

In [24]:
model=als_sub.fit(training)

In [25]:
training.describe().show()

+-------+--------------------+------------------+-----------------+-----------------+-----------------+
|summary|          product_id|            rating|      customer_id|      product_idx|     customer_idx|
+-------+--------------------+------------------+-----------------+-----------------+-----------------+
|  count|              288682|            288682|           288682|           288682|           288682|
|   mean|2.4407264031113822E7| 4.474757691854705|9190223.706258098|587.6582952868554|93191.00962650945|
| stddev|2.3789192574856587E7|1.0159247584633284|6308389.374471655|750.8995450978624|  78297.775142811|
|    min|            10001012|               1.0|               10|              0.0|              0.0|
|    max|             9996258|               5.0|          9999890|           4210.0|         251121.0|
+-------+--------------------+------------------+-----------------+-----------------+-----------------+



In [26]:
testing.describe().show()

+-------+-------------------+------------------+-----------------+-----------------+-----------------+
|summary|         product_id|            rating|      customer_id|      product_idx|     customer_idx|
+-------+-------------------+------------------+-----------------+-----------------+-----------------+
|  count|              72367|             72367|            72367|            72367|            72367|
|   mean|2.439480028436995E7| 4.476418809678445|9134430.242569126| 582.635455387124|93620.34816974588|
| stddev|2.376741217735122E7|1.0197826874692224|6305220.237035178|744.4526577916612|78337.60823268935|
|    min|           10001012|               1.0|              100|              0.0|              0.0|
|    max|            9996258|               5.0|          9999769|           4204.0|         251118.0|
+-------+-------------------+------------------+-----------------+-----------------+-----------------+



## Đánh giá kết quả 

In [27]:
predictions = model.transform(testing)

In [28]:
evaluator =  RegressionEvaluator(metricName='rmse',
                                labelCol='rating',
                                predictionCol='prediction')
rmse= evaluator.evaluate(predictions)
print('Root mean squared error = ', str(rmse))

Root mean squared error =  1.1105112311977445


Nhận xét: Kết quả Model là 1.1 (đã tuning nhiều lần) nhưng vẫn chưa bé hơn stde (1.01)

## Đề xuất cho User

In [29]:
from pyspark.sql.functions import col,explode

In [30]:
user_recs = model.recommendForAllUsers(5)



In [31]:
for user in user_recs.head(3):
    print(user)

Row(customer_idx=1, recommendations=[Row(product_idx=4026, rating=5.252419471740723), Row(product_idx=3926, rating=4.952212810516357), Row(product_idx=3993, rating=4.919150352478027), Row(product_idx=3478, rating=4.8336405754089355), Row(product_idx=3919, rating=4.791848182678223)])
Row(customer_idx=3, recommendations=[Row(product_idx=4026, rating=4.8788161277771), Row(product_idx=3993, rating=4.539760112762451), Row(product_idx=3926, rating=4.53629207611084), Row(product_idx=3478, rating=4.496156215667725), Row(product_idx=3919, rating=4.446391582489014)])
Row(customer_idx=4, recommendations=[Row(product_idx=4026, rating=5.441257476806641), Row(product_idx=3926, rating=5.080391883850098), Row(product_idx=3993, rating=5.045040607452393), Row(product_idx=3478, rating=5.023173809051514), Row(product_idx=3919, rating=4.960630893707275)])


In [32]:
df_customer_id= data_indexed.select('customer_idx','customer_id').distinct()

In [33]:
df_customer_id.show(5)

+------------+-----------+
|customer_idx|customer_id|
+------------+-----------+
|     10594.0|    8955703|
|       359.0|   11110733|
|     13897.0|   14425088|
|    230391.0|    8051250|
|      8448.0|     270593|
+------------+-----------+
only showing top 5 rows



In [34]:
df_product_idx=data_indexed.select('product_idx','product_id').distinct()

In [35]:
df_product_idx.show(5)

+-----------+----------+
|product_idx|product_id|
+-----------+----------+
|     2868.0|  12011510|
|     2872.0|  15222490|
|     2317.0|   1530039|
|     3687.0|  19899003|
|     1341.0|  26627033|
+-----------+----------+
only showing top 5 rows



In [36]:
new_user_recs = user_recs.join(df_customer_id, on=['customer_idx'],how='left')

In [37]:
new_user_recs.show(10)

+------------+--------------------+-----------+
|customer_idx|     recommendations|customer_id|
+------------+--------------------+-----------+
|           1|[{4026, 5.2524195...|    7280719|
|           3|[{4026, 4.878816}...|    7377207|
|           4|[{4026, 5.4412575...|    1064154|
|           5|[{4026, 5.5898585...|    1425077|
|           6|[{4026, 5.477124}...|    1046981|
|           7|[{4026, 5.506749}...|    6177374|
|           8|[{4026, 5.4872737...|    6844844|
|           9|[{4026, 5.4044127...|   10371235|
|          12|[{4026, 5.2238784...|   11575918|
|          13|[{4026, 5.4445252...|     151415|
+------------+--------------------+-----------+
only showing top 10 rows



In [42]:
def rec_sys_CF(new_user_recs,id):
    find_user_rec = new_user_recs.filter(new_user_recs['customer_id'] == id)
    user = find_user_rec.first() 
    lst = []
    for row in user['recommendations']:   
        row_f = df_product_idx.filter(df_product_idx.product_idx == row['product_idx'])  
        row_f_first = row_f.first()
        lst.append((row['product_idx'], row_f_first['product_id'], row['rating']))
    dic_user_rec = {'customer_id' : user.customer_id, 'recommendations' :lst}
    return dic_user_rec

In [43]:
results=rec_sys_CF(new_user_recs,709310)

In [45]:
results

{'customer_id': '709310',
 'recommendations': [(4026, '66251373', 3.167637348175049),
  (3926, '73238633', 2.9528491497039795),
  (3993, '50560427', 2.9364469051361084),
  (3478, '53035877', 2.908506393432617),
  (3919, '72520984', 2.877462148666382)]}

## Chuẩn hóa dữ liệu cho User

In [46]:
from pyspark.sql.functions import col,explode

In [79]:
def recomendation(results):
    lst=[]
    for recc in results['recommendations']:
        lst.append(recc[1])
    print('Đây là 5 sản phẩm đề xuất cho người dùng là')
    item = product.select('item_id','name').filter(product['item_id'].isin(lst))
    item.show(truncate=False)

In [80]:
recomendation(results)

Đây là 5 sản phẩm đề xuất cho người dùng là
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|item_id |name                                                                                                                                                  |
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------+
|66251373|Flycam Bugs 20 EIS Gimbal 1 trục + chống rung điện tử - Hàng chính hãng                                                                               |
|72520984|Màn Hình Máy Tính Dell E2720HS 27 Inch FHD (1920 x 1080) 5ms 60Hz IPS Stereo Speakers - Hàng Chính Hãng                                               |
|73238633|Ổ cứng di động External SSD Sandisk Extreme V2 E61 1TB USB 3.2 Gen 2 SDSSDE61-1T00-G25 - Hàng Chính Hãng                                