In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark
import pandas

In [2]:
spark = SparkSession.builder.appName('ALS').getOrCreate()

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode

In [4]:
data = spark.read.csv('Review_clean.csv',header=True,inferSchema=True)

In [5]:
data.show(5, truncate=True)

+-----------+----------+-----------------+------+------------------+--------------------+--------------------+
|customer_id|product_id|             name|rating|             title|             content|  review_information|
+-----------+----------+-----------------+------+------------------+--------------------+--------------------+
|     709310|  10001012| Lân Nguyễn Hoàng|     3|Ko dùng đc thẻ nhớ|Lúcđầu quên thông...|không thẻ_nhớ lúc...|
|   10701688|  10001012| Nguyễn Khánh Hòa|     5|   Cực kì hài lòng|Tiki giao hàng nh...|cực_kì hài_lòng h...|
|   11763074|  10001012|  Toàn Phạm Khánh|     5|   Cực kì hài lòng|chất lượng camera...|cực_kì hài_lòng c...|
|    9909549|  10001012|Nguyen Quang Minh|     5|      Rất hài lòng|Hàng được đóng gó...|hài_lòng hàng đón...|
|    1827148|  10001012|      Phạm Bá Đức|     5|   Cực kì hài lòng|dễ cài đặt, chất ...|cực_kì hài_lòng c...|
+-----------+----------+-----------------+------+------------------+--------------------+--------------------+
o

In [6]:
data_sub = data.select(['product_id','rating','customer_id'])

In [7]:
data_sub.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- customer_id: string (nullable = true)



In [8]:
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import isnan, when, count, col, udf

In [9]:
data_sub = data_sub.withColumn('rating', data_sub["rating"].cast(DoubleType()))
data_sub = data_sub.withColumn('customer_id', data_sub["customer_id"].cast(IntegerType()))
data_sub = data_sub.withColumn('product_id', data_sub["product_id"].cast(IntegerType()))

In [10]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in data_sub.columns]).toPandas().T

Unnamed: 0,0
product_id,1656
rating,1718
customer_id,1687


In [11]:
data_sub = data_sub.na.drop(how='any')

In [12]:
data_sub.count()

361720

In [13]:
# Distinct users and products
users = data_sub.select("customer_id").distinct().count()
products = data_sub.select("product_id").distinct().count()
numerator = data_sub.count()

In [14]:
display(numerator, users, products)

361720

251467

4218

In [15]:
# Number of rating matrix could contain if no empty cells
denominator = users * products
denominator

1060687806

In [16]:
sparsity = 1- (numerator * 1.0/ denominator)
print("sparsity:"), sparsity

sparsity:


(None, 0.9996589759984476)

### Feature Transformation

In [17]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [18]:
indexer = StringIndexer(inputCol='product_id',
                        outputCol='product_id_idx')
data_indexed = indexer.fit(data_sub).transform(data_sub)

indexer1 = StringIndexer(inputCol='customer_id',
                        outputCol='customer_id_idx')
data_indexed = indexer1.fit(data_indexed).transform(data_indexed)

In [19]:
data_indexed.show(5, truncate=True)

+----------+------+-----------+--------------+---------------+
|product_id|rating|customer_id|product_id_idx|customer_id_idx|
+----------+------+-----------+--------------+---------------+
|  10001012|   3.0|     709310|        2458.0|       216788.0|
|  10001012|   5.0|   10701688|        2458.0|        24473.0|
|  10001012|   5.0|   11763074|        2458.0|        26843.0|
|  10001012|   5.0|    9909549|        2458.0|         3377.0|
|  10001012|   5.0|    1827148|        2458.0|          396.0|
+----------+------+-----------+--------------+---------------+
only showing top 5 rows



### Train model

In [20]:
# Chia dữ liệu train_test
# Smaller dataset so we will use 0.8/0.2
(training, test)= data_indexed.randomSplit([0.8,0.2])

In [21]:
# xây dựng model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=10,
          regParam=0.09,
          rank=25,
          userCol="customer_id_idx",
          itemCol="product_id_idx",
          ratingCol="rating",
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(training)
# rank cho to hơn vì phim nhiều và user nhiều

In [22]:
data_indexed.select("rating").describe().show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|            361720|
|   mean| 4.474928121198717|
| stddev|1.0169193161968286|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



### Evaluate model

In [23]:
predictions = model.transform(test)

In [24]:
predictions.show(5)

+----------+------+-----------+--------------+---------------+----------+
|product_id|rating|customer_id|product_id_idx|customer_id_idx|prediction|
+----------+------+-----------+--------------+---------------+----------+
|   3524487|   5.0|    8915425|         148.0|        58123.0| 1.8633475|
|   3524487|   4.0|   19365193|         148.0|        16553.0|0.32141906|
|   3524487|   5.0|    7191994|         148.0|        20424.0|  3.555693|
|   3524487|   1.0|   17212146|         148.0|        37194.0| 2.5532603|
|   3524487|   4.0|   10565655|         148.0|        24194.0| 2.4223173|
+----------+------+-----------+--------------+---------------+----------+
only showing top 5 rows



In [25]:
from pyspark.ml.evaluation import RegressionEvaluator

In [26]:
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean square error = "+ str(rmse))

Root-mean square error = 1.6546422747965646


### Tunning parameter

In [27]:
ALSExplicit = ALS( implicitPrefs=False, userCol="customer_id_idx", itemCol="product_id_idx", ratingCol="rating",
          coldStartStrategy="drop")

defaultModel = ALSExplicit.fit(training)

In [28]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

In [29]:
paramMapExplicit = ParamGridBuilder() \
                    .addGrid(ALSExplicit.rank, [30, 40,50]) \
                    .addGrid(ALSExplicit.maxIter, [5,10,15,20]) \
                    .addGrid(ALSExplicit.regParam, [0.1,0.01,0.001]) \
                    .addGrid(ALSExplicit.alpha, [2.0,3.0]) \
                    .build()

In [30]:
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

In [None]:
CVALSExplicit = CrossValidator(estimator=ALSExplicit,
                            estimatorParamMaps=paramMapExplicit,
                            evaluator=evaluatorR,
                           numFolds=5)

CVModelEXplicit = CVALSExplicit.fit(training)

In [None]:
predictions_t = CVModelEXplicit.transform(test)

In [None]:
rmse_t = evaluator.evaluate(predictions_t)
print("Root-mean square error = "+ str(rmse_t))

In [None]:
als_t = ALS(maxIter=10,
          regParam=0.1,
          rank = 30,
          userCol="customer_id_idx",
          itemCol="product_id_idx",
          ratingCol="rating",
          coldStartStrategy="drop",
          nonnegative=True)
model_t = als_t.fit(training)

In [None]:
predictions_t = model_t.transform(test)

In [None]:
rmse_t = evaluator.evaluate(predictions_t)
print("Root-mean square error = "+ str(rmse_t))

In [None]:
# Nhận xét: 

### Make recommendations to all users

In [None]:
# Get 5 recommendations which have highest rating
user_recs = model_t.recommendForAllUsers(5)

In [None]:
user_recs.show(10,truncate=False)

In [None]:
user_recs.printSchema()

In [None]:
for user in user_recs.head(3):
    print(user)
    print("\n")

### Save to file

In [None]:
from time import time
t0 = time()

In [None]:
df_reviewer_reviewer_id = data_indexed.select('customer_id_idx', 'customer_id').distinct()

In [None]:
df_reviewer_reviewer_id.count()

In [None]:
df_reviewer_reviewer_id.show(5)

In [None]:
df_asin_asin_id = data_indexed.select('product_id_idx', 'product_id').distinct()

In [None]:
df_asin_asin_id.count()

In [None]:
df_asin_asin_id.show(5)

In [None]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['customer_id_idx'], how="left")

In [None]:
new_user_recs.show(10, truncate=False)

In [None]:
new_user_recs.count()

### Save to disk

In [None]:
new_user_recs.write.parquet('Recommendation_U.parquet', mode='overwrite')
df_asin_asin_id.write.parquet('Recommendation_P.parquet', mode='overwrite')

In [None]:
time_duration = time() - 10
print(time_duration)

### Make recommendations to a particular user

In [None]:
customer_id = "5917275"
find_user_rec = new_user_recs.filter(new_user_recs['customer_id'] == customer_id)
user = find_user_rec.first()

lst=[]
for row in user['recommendations']:
    row_f = df_asin_asin_id.filter(df_asin_asin_id.product_id_idx == row['product_id_idx'])
    row_f_first = row_f.first()
    lst.append((row['product_id_idx'], row_f_first['product_id'], row['rating']))
dic_user_rec = {'customer_id' : user.customer_id, 'recommendations':lst}

In [None]:
dic_user_rec

### Đọc 2 file đã lưu để lấy dữ liệu đầu vào => Đề xuất

In [None]:
new_user_recs = spark.read.parquet('Recommendation_U.parquet')

In [None]:
new_user_recs.printSchema()

In [None]:
new_user_recs.show(2)

In [None]:
df_asin_asin_id = spark.read.parquet('Recommendation_P.parquet')

In [None]:
df_asin_asin_id.show(2)

In [None]:
customer_id = "5917275"
find_user_rec = new_user_recs.filter(new_user_recs['customer_id'] == customer_id)
find_user_rec.show(truncate=False)

In [None]:
result = ''
for user in find_user_rec.collect():
    lst=[]
    for row in user['recommendations']:
        print(row)
        row_f = df_asin_asin_id.filter(df_asin_asin_id.product_id_idx == row['product_id_idx'])
        row_f_first = row_f.first()
        lst.append((row['product_id_idx'], row_f_first['product_id'], row['rating']))
    dic_user_rec = {'customer_id' : user.customer_id, 'recommendations':lst}
    result = dic_user_rec