# CHAPTER 9 - EXERCISE 4

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession

## Nhập dữ liệu

In [2]:
sc= SparkContext(master= 'local', appName= 'Chapter 9 - Exercise 4')
ss= SparkSession(sc)

In [3]:
path= '/Users/vovanthuong/Desktop/9 - Big Data in Machine Learning/Data/Chapter9/LDS9_Data_Day_7/Beauty_5.json'
df= ss.read.json(path)

In [4]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



In [5]:
# Số lượng review
df.count()

198502

In [6]:
# Số lượng item
df.select('asin').distinct().count()

12101

In [7]:
# Số lượng reviewerID
df.select('reviewerID').distinct().count()

22363

In [8]:
df= df.dropna(how= 'any', subset= ['asin', 'reviewerID', 'overall'])

## Tạo tập train và test

In [9]:
train, test= df.randomSplit([0.8, 0.2])

## Xây dựng mô hình

In [10]:
from pyspark.ml.feature import SQLTransformer, StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline

# 1
select= SQLTransformer(statement= 'SELECT asin, reviewerID, overall FROM __THIS__')
# 2
asin_laber_to_indexer= StringIndexer(inputCol= 'asin', outputCol= 'asin_idx')
# 3
reviewerID_laber_to_indexer= StringIndexer(inputCol= 'reviewerID', outputCol= 'reviewerID_idx')
# 4
als= ALS(maxIter= 25, regParam= 0.05,
         userCol= 'reviewerID_idx', itemCol= 'asin_idx', ratingCol= 'overall')
# Pipe
pipe_als= Pipeline(stages= [select, asin_laber_to_indexer, reviewerID_laber_to_indexer, als])

In [11]:
pipe_als_model= pipe_als.fit(train)

## Đánh giá kết quả của mô hình

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator= RegressionEvaluator(labelCol= 'overall', predictionCol= 'prediction',metricName= 'rmse')

### Trên tập train

In [13]:
train_result= pipe_als_model.transform(train)
evaluator.evaluate(train_result)

0.18521408810545173

### Trên tập test

In [14]:
# Lọc các reviewerID chưa xuất hiện trong tệp train
from pyspark.sql.functions import col
test= test.filter(col('reviewerID').isin(pipe_als_model.stages[2].labels))

In [15]:
test_result= pipe_als_model.transform(test)
evaluator.evaluate(test_result)

1.4488662855124683

## Recomment cho toàn bộ user

In [16]:
recomment_for_all= pipe_als_model.stages[3].recommendForAllUsers(numItems= 10)

## Recomment cho một user

In [29]:
from pyspark.ml.feature import IndexToString
from pyspark.sql.functions import posexplode, col

asin_indexer_to_label = IndexToString(inputCol="asin_idx", 
                                            outputCol="asin", 
                                            labels= pipe_als_model.stages[1].labels)
def recommend_for_user(als_model, reviewerID, numItems):
    # Chuyển reviewerID thành reviewerID_idx
    reviewerID= ss.createDataFrame([(reviewerID,)], ['reviewerID', ])
    reviewerID= pipe_als_model.stages[2].transform(reviewerID)
    reviewerID_idx= reviewerID.select('reviewerID_idx').rdd.flatMap(lambda x: x).collect()[0]
    # Đề xuất một số lượng item cao nhất cho tất cả user
    data= als_model.recommendForAllUsers(numItems= numItems)
    # Lọc lại user quan tâm
    data= data.where(data.reviewerID_idx == reviewerID_idx)
    # Đưa cột reviewerID vào data
    data= data.join(reviewerID, on=['reviewerID_idx'], how='inner')
    # Chuyển đổi dữ liệu
    data= data.select('reviewerID', 'reviewerID_idx',
                      posexplode("recommendations").alias('No', 'recommendations'))
    data= data.select('reviewerID', 'reviewerID_idx', 'No',
                      (col('recommendations')['asin_idx']).alias('asin_idx'),
                      (col('recommendations')['rating']).alias('rating'))
    data= asin_indexer_to_label.transform(data)
    data.show()
    asin= data.select('asin')
    return asin

In [35]:
asin_recomment= recommend_for_user(als_model= pipe_als_model.stages[-1], 
                                   reviewerID= 'A2V5R832QCSOMX', numItems= 10)

+--------------+--------------+---+--------+---------+----------+
|    reviewerID|reviewerID_idx| No|asin_idx|   rating|      asin|
+--------------+--------------+---+--------+---------+----------+
|A2V5R832QCSOMX|             0|  0|    4830| 5.975937|B000P9FP10|
|A2V5R832QCSOMX|             0|  1|    5751| 5.913575|B00447EWNG|
|A2V5R832QCSOMX|             0|  2|    8480|5.8183026|B0072AJLNI|
|A2V5R832QCSOMX|             0|  3|    9900| 5.817519|B000XTBEY4|
|A2V5R832QCSOMX|             0|  4|    7457|5.7841544|B0030HPY74|
|A2V5R832QCSOMX|             0|  5|   10803| 5.761909|B001LNOCXQ|
|A2V5R832QCSOMX|             0|  6|    9021| 5.755734|B003ZYKUYY|
|A2V5R832QCSOMX|             0|  7|   10630| 5.750055|B0068J5K52|
|A2V5R832QCSOMX|             0|  8|    6813|5.7496414|B0002B0R14|
|A2V5R832QCSOMX|             0|  9|    8700| 5.724177|B000CNIHEG|
+--------------+--------------+---+--------+---------+----------+



# NHÁP

In [37]:
path= '/Users/vovanthuong/Desktop/9 - Big Data in Machine Learning/LDS9_DeThi/Du lieu cung cap/ratings_Grocery_and_Gourmet_Food.csv'
df_test= ss.read.csv(path, header= False, inferSchema= True)
df_test= df_test.toDF('reviewerID', 'asin', 'overall', 'unixReviewTime')

In [43]:
from pyspark.sql.functions import col

df_test.where(condition= col('reviewerID') == 'A3ABZBEG3KZ0L').count()

0

In [41]:
df_test.where(condition= col('reviewerID') == 'A2BSUJYATHI7WW').show()

+--------------+----------+-------+--------------+
|    reviewerID|      asin|overall|unixReviewTime|
+--------------+----------+-------+--------------+
|A2BSUJYATHI7WW|B002YRBALU|    4.0|    1395619200|
|A2BSUJYATHI7WW|B007GPARA0|    4.0|    1395619200|
+--------------+----------+-------+--------------+



In [42]:
df_test.where(condition= col('reviewerID') == 'A26LKBXTSIHQV2').show()

+--------------+----------+-------+--------------+
|    reviewerID|      asin|overall|unixReviewTime|
+--------------+----------+-------+--------------+
|A26LKBXTSIHQV2|1613170416|    5.0|    1398902400|
+--------------+----------+-------+--------------+



In [46]:
df_test.cache()

<bound method DataFrame.cache of DataFrame[reviewerID: string, asin: string, overall: double, unixReviewTime: int]>

In [47]:
df_test.is_cached

True