In [1]:
import findspark 
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import isnan, when, count, col
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer 
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

### About Data
- user_id - ID of the reviewer
- product_id of the product
- user - name of the reviewer
- rating - rating of the product

### Load Data and Pre process

In [4]:
spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/30 20:14:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/30 20:14:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/30 20:14:56 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [5]:
data = spark.read.csv("Products_ThoiTrangNam_rating_raw.csv",header=True
                      ,inferSchema=True, sep='\t')

In [6]:
data.count()

1024482

In [7]:
data.show(5)

+----------+-------+------------------+------+
|product_id|user_id|              user|rating|
+----------+-------+------------------+------+
|       190|      1|      karmakyun2nd|     5|
|       190|      2|  tranquangvinh_vv|     5|
|       190|      3|nguyenquoctoan2005|     5|
|       190|      4|    nguyenthuyhavi|     5|
|       190|      5|      luonganh5595|     5|
+----------+-------+------------------+------+
only showing top 5 rows



In [8]:
data.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user: string (nullable = true)
 |-- rating: integer (nullable = true)



In [9]:
data_sub = data.select(['product_id', 'rating', 'user_id'])

In [10]:
data_sub.count()

1024482

In [11]:
### check isnull
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in data_sub.columns]).toPandas().T

Unnamed: 0,0
product_id,0
rating,0
user_id,0


In [12]:
### distinct users and and Movies
users = data_sub.select("user_id").distinct().count() 
products = data_sub.select("product_id").distinct().count() 
numerator = data_sub.count()

                                                                                

In [13]:
display(numerator, users, products)

1024482

650636

31267

In [14]:
# Number of ratings matrix could contain if no empty cells
denominator = users * products 
### ratio 1M rating/20B --- sparse matrix
denominator

20343435812

In [15]:
#Calculating sparsity
## hard to expect good rmse
sparsity = 1 - (numerator*1.0 / denominator) 
print ("Sparsity: "), sparsity

Sparsity: 


(None, 0.9999496406600406)

In [16]:
data_sub.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- user_id: integer (nullable = true)



### StringIndexer

In [17]:
# Create an indexer
indexer = StringIndexer(inputCol='product_id', outputCol='product_id_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data_sub)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='user_id',
                         outputCol='user_id_idx')
indexer1_model = indexer1.fit(data_indexed) 
data_indexed = indexer1_model.transform(data_indexed)

[Stage 32:>                                                         (0 + 1) / 1]                                                                                

In [18]:
data_indexed.show(5, truncate=True)

23/03/30 20:15:11 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
+----------+------+-------+--------------+-----------+
|product_id|rating|user_id|product_id_idx|user_id_idx|
+----------+------+-------+--------------+-----------+
|       190|     5|      1|         367.0|   113654.0|
|       190|     5|      2|         367.0|    50699.0|
|       190|     5|      3|         367.0|   284299.0|
|       190|     5|      4|         367.0|   376354.0|
|       190|     5|      5|         367.0|     6462.0|
+----------+------+-------+--------------+-----------+
only showing top 5 rows



In [19]:
### check isnull again
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in data_indexed.columns]).toPandas().T

Unnamed: 0,0
product_id,0
rating,0
user_id,0
product_id_idx,0
user_id_idx,0


### Build Model

In [20]:
# use 0.8 / 0.2
(training, test) = data_indexed.randomSplit([0.8, 0.2])

In [21]:
als = ALS(maxIter=5,
          regParam=0.01,
          rank = 25,
          userCol="user_id_idx",
          itemCol="product_id_idx",
          ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True)
model = als.fit(training)

23/03/30 20:15:14 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:15:16 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:15:18 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 39:>                                                         (0 + 7) / 7]

23/03/30 20:15:19 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 42:>                                                         (0 + 0) / 7]

23/03/30 20:15:23 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:15:25 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 47:>                                                        (0 + 0) / 10]

23/03/30 20:15:27 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 47:>                                                       (0 + 10) / 10]

23/03/30 20:15:28 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/03/30 20:15:28 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
23/03/30 20:15:29 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 48:>                                                       (0 + 10) / 10]

23/03/30 20:15:31 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 49:>                                                       (0 + 10) / 10]

23/03/30 20:15:35 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:15:38 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 52:>                                                        (0 + 0) / 10]

23/03/30 20:15:41 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 52:>                                                       (0 + 10) / 10]

23/03/30 20:15:44 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 54:>                                                        (0 + 0) / 10]

23/03/30 20:15:47 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 54:>                                                       (0 + 10) / 10]

23/03/30 20:15:50 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 56:>                                                        (0 + 0) / 10]

23/03/30 20:15:54 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:15:55 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 70:>                                                        (0 + 0) / 10]

23/03/30 20:15:59 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

In [22]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [23]:
predictions.select(["product_id_idx", "user_id_idx",
                    "rating", "prediction"]).show(5)

[Stage 71:>                                                         (0 + 0) / 7]

23/03/30 20:16:02 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 71:>                 (0 + 0) / 7][Stage 85:>                (0 + 0) / 10]

23/03/30 20:16:03 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 71:>   (0 + 0) / 7][Stage 85:>  (0 + 0) / 10][Stage 86:>  (0 + 0) / 10]

23/03/30 20:16:05 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:16:08 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

+--------------+-----------+------+----------+
|product_id_idx|user_id_idx|rating|prediction|
+--------------+-----------+------+----------+
|        8638.0|       31.0|     5|  7.683725|
|         148.0|       31.0|     1|  2.396131|
|        2122.0|       34.0|     5| 4.6566224|
|        2122.0|      133.0|     5| 4.5032015|
|        1088.0|      148.0|     5| 4.7659764|
+--------------+-----------+------+----------+
only showing top 5 rows





In [24]:
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="rating",
                                predictionCol="prediction") 
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

23/03/30 20:16:13 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 122:>                (0 + 0) / 7][Stage 136:>               (0 + 0) / 10]

23/03/30 20:16:14 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 122:>  (0 + 0) / 7][Stage 136:> (0 + 0) / 10][Stage 137:> (0 + 0) / 10]

23/03/30 20:16:16 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 137:>                                                      (0 + 10) / 10]

23/03/30 20:16:20 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:16:24 WARN DAGScheduler: Broadcasting large task binary with size 18.2 MiB




23/03/30 20:16:26 WARN DAGScheduler: Broadcasting large task binary with size 18.2 MiB


[Stage 173:>                                                        (0 + 3) / 3]

Root-mean-square error = 1.6412937567645896


                                                                                

### Result
- bad Result

In [25]:
#from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# initialize the ALS model
#als_model = ALS(userCol='user_id_idx', itemCol='product_id_idx', ratingCol='rating', coldStartStrategy='drop')
# create the parameter grid
#params = ParamGridBuilder().addGrid(als_model.regParam, [.01, .05, .1, .15]).addGrid(als_model.rank, [10, 50, 100, 150]).build()
#instantiating crossvalidator estimator
#cv = CrossValidator(estimator=als_model, estimatorParamMaps=params, evaluator=evaluator, parallelism=4)
#best_model = cv.fit(data_indexed)
#model = best_model.bestModel

### Tuning Model

In [28]:
als_t = ALS(maxIter=10,
         regParam=0.2,
            rank = 50,
         userCol="user_id_idx",
          itemCol="product_id_idx",
          ratingCol="rating",
         coldStartStrategy='drop',
         nonnegative=True)
model_t = als_t.fit(training)
predictions_t = model_t.transform(test)
rmse_t = evaluator.evaluate(predictions_t)

print('rootmse =' +str(rmse_t))

23/03/30 20:17:17 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:17:19 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:17:21 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 176:>                                                        (0 + 7) / 7]

23/03/30 20:17:23 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:17:24 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:17:26 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:17:27 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 183:>                                                      (0 + 10) / 10]

23/03/30 20:17:29 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 185:>                                                      (0 + 10) / 10]

23/03/30 20:17:31 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 186:>                                                      (0 + 10) / 10]

23/03/30 20:17:34 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 187:>                                                      (0 + 10) / 10]

23/03/30 20:17:37 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 188:>                                                      (0 + 10) / 10]

23/03/30 20:17:39 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 189:>                                                      (0 + 10) / 10]

23/03/30 20:17:41 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:17:44 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 191:>                                                      (0 + 10) / 10]

23/03/30 20:17:46 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:17:48 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 193:>                                                      (0 + 10) / 10]

23/03/30 20:17:50 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:17:53 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:17:56 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:17:58 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 197:>                                                      (0 + 10) / 10]

23/03/30 20:18:00 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 198:>                                                      (0 + 10) / 10]

23/03/30 20:18:03 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB




23/03/30 20:18:04 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 200:>                                                      (0 + 10) / 10]

23/03/30 20:18:07 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 202:>                                                       (0 + 0) / 10]

23/03/30 20:18:09 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 202:>                                                      (0 + 10) / 10]

23/03/30 20:18:12 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 203:>                                                      (0 + 10) / 10]

23/03/30 20:18:13 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:18:16 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:18:18 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 228:>                (0 + 0) / 7][Stage 252:>               (0 + 0) / 10]

23/03/30 20:18:19 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 228:>  (0 + 0) / 7][Stage 252:> (0 + 0) / 10][Stage 253:> (0 + 0) / 10]

23/03/30 20:18:21 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 253:>                                                      (0 + 10) / 10]

23/03/30 20:18:23 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


                                                                                

23/03/30 20:18:28 WARN DAGScheduler: Broadcasting large task binary with size 18.2 MiB




23/03/30 20:18:30 WARN DAGScheduler: Broadcasting large task binary with size 18.2 MiB


[Stage 309:>                                                        (0 + 4) / 4]

rootmse =1.0815501961792808


                                                                                

### Comment:
- the result is better

### Recommend 

In [29]:
ser_recs = model_t.recommendForAllUsers(10)

In [30]:
for user in ser_recs.head(2):
    print(user)
    print("\n")

23/03/30 20:19:00 WARN DAGScheduler: Broadcasting large task binary with size 18.2 MiB


[Stage 358:>                                                        (0 + 0) / 1]

23/03/30 20:22:39 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


[Stage 358:>                                                        (0 + 1) / 1]

Row(user_id_idx=31, recommendations=[Row(product_id_idx=24311, rating=5.205354690551758), Row(product_id_idx=24530, rating=5.156948566436768), Row(product_id_idx=21747, rating=5.105452060699463), Row(product_id_idx=23243, rating=5.101497173309326), Row(product_id_idx=29670, rating=5.064798831939697), Row(product_id_idx=17271, rating=5.025424957275391), Row(product_id_idx=18881, rating=5.009730339050293), Row(product_id_idx=21690, rating=5.002012252807617), Row(product_id_idx=27479, rating=4.996496200561523), Row(product_id_idx=20285, rating=4.994470119476318)])


Row(user_id_idx=34, recommendations=[Row(product_id_idx=24311, rating=5.2627363204956055), Row(product_id_idx=29670, rating=5.251367092132568), Row(product_id_idx=24530, rating=5.145304203033447), Row(product_id_idx=21747, rating=5.055899143218994), Row(product_id_idx=25302, rating=5.027964115142822), Row(product_id_idx=22947, rating=5.023828506469727), Row(product_id_idx=27115, rating=5.017805099487305), Row(product_id_idx=16

                                                                                