In [None]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql import types
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import *

import matplotlib.pyplot as plt
import seaborn as sb
import warnings
warnings.filterwarnings("ignore")

In [None]:
sc = SparkContext()
spark = SparkSession(sc)

In [None]:
data = spark.read.csv('ReviewRaw.csv',inferSchema=True,header=True)
print(data.count())
data.printSchema()

365821
root
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- created_time: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)



In [None]:
data = data.withColumn('rating',data.rating.cast(IntegerType()))

In [None]:
data.show(1,vertical=True)

-RECORD 0----------------------------
 customer_id  | 709310               
 product_id   | 10001012             
 name         | Lân Nguyễn Hoàng     
 full_name    | Lân Nguyễn Hoàng     
 created_time | null                 
 rating       | 3                    
 title        | Ko dùng đc thẻ nhớ   
 content      | Lúcđầu quên thông... 
only showing top 1 row



In [None]:
data = data[['customer_id','product_id','rating']]
data.show(3)

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|     709310|  10001012|     3|
|   10701688|  10001012|     5|
|   11763074|  10001012|     5|
+-----------+----------+------+
only showing top 3 rows



In [None]:
data.select([count(when(isnan(x),x)).alias(x) for x in data.columns]).show()
data.select([count(when(col(x).isNull(),x)).alias(x) for x in data.columns]).show()

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|          0|         0|     0|
+-----------+----------+------+

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|          1|       854|  1752|
+-----------+----------+------+



In [None]:
data = data.dropna()
data.select([count(when(col(x).isNull(),x)).alias(x) for x in data.columns]).show()

+-----------+----------+------+
|customer_id|product_id|rating|
+-----------+----------+------+
|          0|         0|     0|
+-----------+----------+------+



In [None]:
print(data.count())
data = data.dropDuplicates()
data.count()

364069


360181

In [None]:
data.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- rating: integer (nullable = true)



In [None]:
users = data[['customer_id']].distinct().count()
items = data[['product_id']].distinct().count()
numerator = data.count()

denominator = users * items
sparsity = 1 - (numerator*1.0/denominator)

users, items, numerator, denominator, sparsity

(251467, 4218, 360181, 1060687806, 0.9996604269437599)

In [None]:
indexer = StringIndexer(inputCols=['product_id','customer_id'],outputCols=['item','user']).fit(data)
data = indexer.transform(data)
print(data.columns)

['customer_id', 'product_id', 'rating', 'item', 'user']


In [None]:
data.show(3)

+-----------+----------+------+------+-------+
|customer_id|product_id|rating|  item|   user|
+-----------+----------+------+------+-------+
|    7248606|  10001353|     5|2041.0| 3028.0|
|   15191237|  10001355|     5|1865.0|33222.0|
|   13146900|  10001382|     5| 243.0|29444.0|
+-----------+----------+------+------+-------+
only showing top 3 rows



In [None]:
train,test = data[['user','item','rating']].randomSplit([.8,.2],7)
train.describe().show()
test.describe().show()
data[['rating']].describe().show()

+-------+-----------------+-----------------+------------------+
|summary|             user|             item|            rating|
+-------+-----------------+-----------------+------------------+
|  count|           288245|           288245|            288245|
|   mean|93734.93237003243|588.8488334576489| 4.473909347950528|
| stddev|78439.90558788332|752.4367759924091|1.0175481094233951|
|    min|              0.0|              0.0|                 1|
|    max|         251466.0|           4217.0|                 5|
+-------+-----------------+-----------------+------------------+

+-------+-----------------+-----------------+------------------+
|summary|             user|             item|            rating|
+-------+-----------------+-----------------+------------------+
|  count|            71936|            71936|             71936|
|   mean| 93369.1532612322|581.5716887233096| 4.471516347864768|
| stddev|78421.46306473632|742.5598619617592|1.0203165697498409|
|    min|              0

In [None]:
train.cache()
train.is_cached

True

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
loops = [5,10,15,20]
beta = [1,0.5,0.1,0.05,0.01]

for loop in loops:
    for b in beta:
        model = ALS(maxIter=loop, regParam=b,
                userCol='user',itemCol='item',
                ratingCol='rating',coldStartStrategy='drop',
                nonnegative=True).fit(train)

        predictions = model.transform(test)
        evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',
                                        predictionCol='prediction')
        print('maxIter=%d, regParam=%f | rmse:'%(loop,b),evaluator.evaluate(predictions))

maxIter=5, regParam=1.000000 | rmse: 1.5836253179625084
maxIter=5, regParam=0.500000 | rmse: 1.2821046824233462
maxIter=5, regParam=0.100000 | rmse: 1.9196899044921663
maxIter=5, regParam=0.050000 | rmse: 2.571211610762031
maxIter=5, regParam=0.010000 | rmse: 5.07590656044382
maxIter=10, regParam=1.000000 | rmse: 1.3962515839053398
maxIter=10, regParam=0.500000 | rmse: 1.142632323050807
maxIter=10, regParam=0.100000 | rmse: 1.5718454759565526
maxIter=10, regParam=0.050000 | rmse: 2.063420467862263
maxIter=10, regParam=0.010000 | rmse: 3.981093829961655
maxIter=15, regParam=1.000000 | rmse: 1.3685031945594914
maxIter=15, regParam=0.500000 | rmse: 1.1218496613175126
maxIter=15, regParam=0.100000 | rmse: 1.3755926258206683
maxIter=15, regParam=0.050000 | rmse: 1.815367010620138
maxIter=15, regParam=0.010000 | rmse: 3.3898806755061055
maxIter=20, regParam=1.000000 | rmse: 1.3611857887965828
maxIter=20, regParam=0.500000 | rmse: 1.1168116023562265
maxIter=20, regParam=0.100000 | rmse: 1.273

In [None]:
train.unpersist()
train.is_cached

False

In [None]:
model = ALS(maxIter=25, regParam=0.5,
            userCol='user',itemCol='item',
            ratingCol='rating',coldStartStrategy='drop',
            nonnegative=True).fit(data[['user','item','rating']])

In [None]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',
                                predictionCol='prediction')
print('root mean square error:',evaluator.evaluate(predictions))

root mean square error: 0.6127984420268497


In [None]:
user_recs = model.recommendForAllUsers(6)
print(user_recs.count())
print(user_recs.columns)
user_recs.printSchema()

251467
['user', 'recommendations']
root
 |-- user: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [None]:
for x in user_recs.head(2):
    print(x)
    print()

Row(user=148, recommendations=[Row(item=4156, rating=4.699165344238281), Row(item=4031, rating=4.582298755645752), Row(item=4204, rating=4.556765556335449), Row(item=4120, rating=4.437543869018555), Row(item=4049, rating=4.433034420013428), Row(item=3930, rating=4.416927337646484)])

Row(user=463, recommendations=[Row(item=4156, rating=5.310268402099609), Row(item=4031, rating=5.186141014099121), Row(item=4204, rating=5.157317638397217), Row(item=4120, rating=5.01439905166626), Row(item=4049, rating=5.00749397277832), Row(item=3930, rating=4.979509353637695)])



In [None]:
data.columns

['customer_id', 'product_id', 'rating', 'item', 'user']

In [None]:
user_id = data.select('user','customer_id').distinct().orderBy('user')

In [None]:
new_user_recs = user_recs.join(user_id, on='user',how='left')
new_user_recs.printSchema()

root
 |-- user: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
 |-- customer_id: string (nullable = true)



In [None]:
data_all = new_user_recs.select('customer_id','user',explode('recommendations'))
data_all = data_all.withColumn('item',data_all.col.getField('item'))\
                   .withColumn('rating',data_all.col.getField('rating'))

print(data_all.columns)
data_all.show(5)

['customer_id', 'user', 'col', 'item', 'rating']
+-----------+----+-----------------+----+---------+
|customer_id|user|              col|item|   rating|
+-----------+----+-----------------+----+---------+
|    1340635| 148|{4156, 4.6991653}|4156|4.6991653|
|    1340635| 148|{4031, 4.5822988}|4031|4.5822988|
|    1340635| 148|{4204, 4.5567656}|4204|4.5567656|
|    1340635| 148| {4120, 4.437544}|4120| 4.437544|
|    1340635| 148|{4049, 4.4330344}|4049|4.4330344|
+-----------+----+-----------------+----+---------+
only showing top 5 rows



In [None]:
item_id = data.select('item','product_id').distinct().orderBy('item')
item_id.count()

4218

In [None]:
data_all = data_all.join(item_id,on='item',how='left')
print(data_all.columns)
print(data_all.count())
data_all.show(5)

['item', 'customer_id', 'user', 'col', 'rating', 'product_id']
1508802
+----+-----------+----+-----------------+---------+----------+
|item|customer_id|user|              col|   rating|product_id|
+----+-----------+----+-----------------+---------+----------+
|4156|    1340635| 148|{4156, 4.6991653}|4.6991653|  69507754|
|4031|    1340635| 148|{4031, 4.5822988}|4.5822988|  66251373|
|4204|    1340635| 148|{4204, 4.5567656}|4.5567656|  76732229|
|4120|    1340635| 148| {4120, 4.437544}| 4.437544|  46134868|
|4049|    1340635| 148|{4049, 4.4330344}|4.4330344|  71293311|
+----+-----------+----+-----------------+---------+----------+
only showing top 5 rows



In [None]:
finall_rec = data_all[['customer_id','product_id','rating']].filter(col('rating')>3.5)
finall_rec.count()

1311794

In [None]:
def user_recitems(user_id):
    finall_rec.filter(col('customer_id')==user_id).orderBy(col('rating').desc()).show(5)

In [None]:
user_recitems('1340635')

+-----------+----------+---------+
|customer_id|product_id|   rating|
+-----------+----------+---------+
|    1340635|  69507754|4.6991653|
|    1340635|  66251373|4.5822988|
|    1340635|  76732229|4.5567656|
|    1340635|  46134868| 4.437544|
|    1340635|  71293311|4.4330344|
+-----------+----------+---------+
only showing top 5 rows



## save files

In [None]:
finall_rec.write.parquet("finall_rec.parquet",mode='overwrite')

In [None]:
finall_pq = spark.read.parquet('finall_rec.parquet')
finall_pq.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- rating: float (nullable = true)



In [None]:
finall_pq.count()

1311794

In [None]:
data_all.write.parquet("data_all.parquet",mode='overwrite')

In [None]:
all_pq = spark.read.parquet('data_all.parquet')
print(all_pq.count())
all_pq.printSchema()

1508802
root
 |-- item: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- user: integer (nullable = true)
 |-- col: struct (nullable = true)
 |    |-- item: integer (nullable = true)
 |    |-- rating: float (nullable = true)
 |-- rating: float (nullable = true)
 |-- product_id: string (nullable = true)



## use files

In [None]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import warnings
warnings.filterwarnings("ignore")

In [None]:
sc = SparkContext()
spark = SparkSession(sc)

In [None]:
finall_pq = spark.read.parquet('finall_rec.parquet')
finall_pq.count()
finall_pq.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- rating: float (nullable = true)



In [None]:
def user_recitems(user_id):
    finall_pq.filter(col('customer_id')==user_id).orderBy(col('rating').desc()).show(5)

In [None]:
user_recitems('13146900')

+-----------+----------+---------+
|customer_id|product_id|   rating|
+-----------+----------+---------+
|   13146900|  69507754| 5.236185|
|   13146900|  66251373|5.1209917|
|   13146900|  76732229|5.0829663|
|   13146900|  73238633| 4.942435|
|   13146900|  71293311| 4.936884|
+-----------+----------+---------+
only showing top 5 rows



In [None]:
sc.stop()

# Nhận xét
- Sử dụng ALS:
- Chạy vòng lặp để tìm maxIter và regParam thích hợp
- Chọn maxIter=25, regParam=0.5 do có rmse thấp nhất
- Chỉ đề xuất các items có rating dự đoán cao hơn 3.5
- Đánh giá:
    - Có thể áp dụng mô hình
    